diff --git a/src/allocator.cpp b/src/allocator.cpp index 68875154..045107bc 100644 --- a/src/allocator.cpp +++ b/src/allocator.cpp @@ -143,34 +143,83 @@ uint64_t allocator::get_free_count() return free; } +// FIXME: Move to utils? void bitmap_set(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity) { - if (start == 0) + if (start == 0 && len == 32*bitmap_granularity) + *((uint32_t*)bitmap) = UINT32_MAX; + else if (start == 0 && len == 64*bitmap_granularity) + *((uint64_t*)bitmap) = UINT64_MAX; + else { - if (len == 32*bitmap_granularity) + unsigned bit_start = start / bitmap_granularity; + unsigned bit_end = ((start + len) + bitmap_granularity - 1) / bitmap_granularity; + while (bit_start < bit_end) { - *((uint32_t*)bitmap) = UINT32_MAX; - return; - } - else if (len == 64*bitmap_granularity) - { - *((uint64_t*)bitmap) = UINT64_MAX; - return; - } - } - unsigned bit_start = start / bitmap_granularity; - unsigned bit_end = ((start + len) + bitmap_granularity - 1) / bitmap_granularity; - while (bit_start < bit_end) - { - if (!(bit_start & 7) && bit_end >= bit_start+8) - { - ((uint8_t*)bitmap)[bit_start / 8] = UINT8_MAX; - bit_start += 8; - } - else - { - ((uint8_t*)bitmap)[bit_start / 8] |= 1 << (bit_start % 8); - bit_start++; + if (!(bit_start & 7) && bit_end >= bit_start+8) + { + ((uint8_t*)bitmap)[bit_start / 8] = UINT8_MAX; + bit_start += 8; + } + else + { + ((uint8_t*)bitmap)[bit_start / 8] |= 1 << (bit_start % 8); + bit_start++; + } } } } + +void bitmap_clear(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity) +{ + if (start == 0 && len == 32*bitmap_granularity) + *((uint32_t*)bitmap) = 0; + else if (start == 0 && len == 64*bitmap_granularity) + *((uint64_t*)bitmap) = 0; + else + { + unsigned bit_start = start / bitmap_granularity; + unsigned bit_end = ((start + len) + bitmap_granularity - 1) / bitmap_granularity; + while (bit_start < bit_end) + { + if (!(bit_start & 7) && bit_end >= bit_start+8) + { + ((uint8_t*)bitmap)[bit_start / 8] = 0; + bit_start += 8; + } + else + { + ((uint8_t*)bitmap)[bit_start / 8] &= (0xFF ^ (1 << (bit_start % 8))); + bit_start++; + } + } + } +} + +bool bitmap_check(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity) +{ + bool r = false; + if (start == 0 && len == 32*bitmap_granularity) + r = !!*((uint32_t*)bitmap); + else if (start == 0 && len == 64*bitmap_granularity) + r = !!*((uint64_t*)bitmap); + else + { + unsigned bit_start = start / bitmap_granularity; + unsigned bit_end = ((start + len) + bitmap_granularity - 1) / bitmap_granularity; + while (bit_start < bit_end) + { + if (!(bit_start & 7) && bit_end >= bit_start+8) + { + r = r || !!((uint8_t*)bitmap)[bit_start / 8]; + bit_start += 8; + } + else + { + r = r || (((uint8_t*)bitmap)[bit_start / 8] & (1 << (bit_start % 8))); + bit_start++; + } + } + } + return r; +} diff --git a/src/allocator.h b/src/allocator.h index 1a759297..dd013b50 100644 --- a/src/allocator.h +++ b/src/allocator.h @@ -23,3 +23,5 @@ public: }; void bitmap_set(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity); +void bitmap_clear(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity); +bool bitmap_check(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity); diff --git a/src/blockstore_disk.cpp b/src/blockstore_disk.cpp index 9407aae0..9a2373a5 100644 --- a/src/blockstore_disk.cpp +++ b/src/blockstore_disk.cpp @@ -122,11 +122,6 @@ void blockstore_disk_t::parse_config(std::map & config { throw std::runtime_error("Checksum block size must be a divisor of data block size"); } - if (csum_block_size && csum_block_size != bitmap_granularity) - { - // FIXME: Support other checksum block sizes (with read-modify-write) - throw std::runtime_error("Checksum block sizes other than bitmap_granularity are not supported yet"); - } if (meta_device == "") { meta_device = data_device; @@ -144,7 +139,8 @@ void blockstore_disk_t::parse_config(std::map & config throw std::runtime_error("journal_offset must be a multiple of journal_block_size = "+std::to_string(journal_block_size)); } clean_entry_bitmap_size = data_block_size / bitmap_granularity / 8; - clean_dyn_size = clean_entry_bitmap_size + dirty_dyn_size(data_block_size); + clean_dyn_size = clean_entry_bitmap_size*2 + (csum_block_size + ? data_block_size/csum_block_size*(data_csum_type & 0xFF) : 0); clean_entry_size = sizeof(clean_disk_entry) + clean_dyn_size + 4 /*entry_csum*/; } diff --git a/src/blockstore_disk.h b/src/blockstore_disk.h index a5183f8c..67a50837 100644 --- a/src/blockstore_disk.h +++ b/src/blockstore_disk.h @@ -48,8 +48,12 @@ struct blockstore_disk_t void calc_lengths(bool skip_meta_check = false); void close_all(); - inline uint64_t dirty_dyn_size(uint64_t len) + inline uint64_t dirty_dyn_size(uint64_t offset, uint64_t len) { - return clean_entry_bitmap_size + (csum_block_size ? len/csum_block_size * (data_csum_type & 0xFF) : 0); + // Checksums may be partial if write is not aligned with csum_block_size + return clean_entry_bitmap_size + (csum_block_size + ? ((offset+len+csum_block_size-1)/csum_block_size - offset/csum_block_size) + * (data_csum_type & 0xFF) + : 0); } }; diff --git a/src/blockstore_flush.cpp b/src/blockstore_flush.cpp index c19bc05f..bc9bf20d 100644 --- a/src/blockstore_flush.cpp +++ b/src/blockstore_flush.cpp @@ -3,6 +3,9 @@ #include "blockstore_impl.h" +#define META_BLOCK_UNREAD 0 +#define META_BLOCK_READ 1 + journal_flusher_t::journal_flusher_t(blockstore_impl_t *bs) { this->bs = bs; @@ -38,6 +41,13 @@ journal_flusher_co::journal_flusher_co() bs->disk_error_abort("read operation during flush", data->res, data->iov.iov_len); wait_count--; }; + simple_callback_rj = [this](ring_data_t* data) + { + bs->live = true; + if (data->res != data->iov.iov_len) + bs->disk_error_abort("read operation during flush", data->res, data->iov.iov_len); + wait_journal_count--; + }; simple_callback_w = [this](ring_data_t* data) { bs->live = true; @@ -242,63 +252,102 @@ bool journal_flusher_t::try_find_older(std::map::iterat return found; } +bool journal_flusher_t::try_find_other(std::map::iterator & dirty_end, obj_ver_id & cur) +{ + int search_left = flush_queue.size() - 1; +#ifdef BLOCKSTORE_DEBUG + printf("Flusher overran writers (%lx:%lx v%lu, dirty_start=%08lx) - searching for older flushes (%d left)\n", + cur.oid.inode, cur.oid.stripe, cur.version, bs->journal.dirty_start, search_left); +#endif + while (search_left > 0) + { + cur.oid = flush_queue.front(); + cur.version = flush_versions[cur.oid]; + flush_queue.pop_front(); + flush_versions.erase(cur.oid); + dirty_end = bs->dirty_db.find(cur); + if (dirty_end != bs->dirty_db.end()) + { + if (dirty_end->second.journal_sector >= bs->journal.dirty_start && + (bs->journal.dirty_start >= bs->journal.used_start || + dirty_end->second.journal_sector < bs->journal.used_start)) + { +#ifdef BLOCKSTORE_DEBUG + printf("Write %lx:%lx v%lu is too new: offset=%08lx\n", cur.oid.inode, cur.oid.stripe, cur.version, dirty_end->second.journal_sector); +#endif + enqueue_flush(cur); + } + else + { + auto repeat_it = sync_to_repeat.find(cur.oid); + if (repeat_it != sync_to_repeat.end()) + { + if (repeat_it->second < cur.version) + repeat_it->second = cur.version; + } + else + { + sync_to_repeat[cur.oid] = 0; + break; + } + } + } + search_left--; + } + if (search_left <= 0) + { +#ifdef BLOCKSTORE_DEBUG + printf("No older flushes, stopping\n"); +#endif + } + return search_left > 0; +} + #define await_sqe(label) \ resume_##label:\ sqe = bs->get_sqe();\ if (!sqe)\ {\ - wait_state = label;\ + wait_state = wait_base+label;\ return false;\ }\ data = ((ring_data_t*)sqe->user_data); -// FIXME: Implement batch flushing bool journal_flusher_co::loop() { + int wait_base = 0; // This is much better than implementing the whole function as an FSM // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ... - if (wait_state == 1) - goto resume_1; - else if (wait_state == 2) - goto resume_2; - else if (wait_state == 3) - goto resume_3; - else if (wait_state == 4) - goto resume_4; - else if (wait_state == 5) - goto resume_5; - else if (wait_state == 6) - goto resume_6; - else if (wait_state == 7) - goto resume_7; - else if (wait_state == 8) - goto resume_8; - else if (wait_state == 9) - goto resume_9; - else if (wait_state == 10) - goto resume_10; - else if (wait_state == 12) - goto resume_12; - else if (wait_state == 13) - goto resume_13; - else if (wait_state == 14) - goto resume_14; - else if (wait_state == 15) - goto resume_15; - else if (wait_state == 16) - goto resume_16; - else if (wait_state == 17) - goto resume_17; - else if (wait_state == 18) - goto resume_18; - else if (wait_state == 19) - goto resume_19; - else if (wait_state == 20) - goto resume_20; - else if (wait_state == 21) - goto resume_21; - else if (wait_state == 22) - goto resume_22; + // Or just C++ coroutines, but they require some wrappers + if (wait_state == 1) goto resume_1; + else if (wait_state == 2) goto resume_2; + else if (wait_state == 3) goto resume_3; + else if (wait_state == 4) goto resume_4; + else if (wait_state == 5) goto resume_5; + else if (wait_state == 6) goto resume_6; + else if (wait_state == 7) goto resume_7; + else if (wait_state == 8) goto resume_8; + else if (wait_state == 9) goto resume_9; + else if (wait_state == 10) goto resume_10; + else if (wait_state == 11) goto resume_11; + else if (wait_state == 12) goto resume_12; + else if (wait_state == 13) goto resume_13; + else if (wait_state == 14) goto resume_14; + else if (wait_state == 15) goto resume_15; + else if (wait_state == 16) goto resume_16; + else if (wait_state == 17) goto resume_17; + else if (wait_state == 18) goto resume_18; + else if (wait_state == 19) goto resume_19; + else if (wait_state == 20) goto resume_20; + else if (wait_state == 21) goto resume_21; + else if (wait_state == 22) goto resume_22; + else if (wait_state == 23) goto resume_23; + else if (wait_state == 24) goto resume_24; + else if (wait_state == 25) goto resume_25; + else if (wait_state == 26) goto resume_26; + else if (wait_state == 27) goto resume_27; + else if (wait_state == 28) goto resume_28; + else if (wait_state == 29) goto resume_29; resume_0: if (flusher->flush_queue.size() < flusher->min_flusher_count && !flusher->trim_wanted || !flusher->flush_queue.size() || !flusher->dequeuing) @@ -348,56 +397,12 @@ stop_flusher: // And it may even block writes if we don't flush the older version // (if it's in the beginning of the journal)... // So first try to find an older version of the same object to flush. - bool found = flusher->try_find_older(dirty_end, cur); - if (!found) + if (!flusher->try_find_older(dirty_end, cur)) { // Try other objects flusher->sync_to_repeat.erase(cur.oid); - int search_left = flusher->flush_queue.size() - 1; -#ifdef BLOCKSTORE_DEBUG - printf("Flusher overran writers (%lx:%lx v%lu, dirty_start=%08lx) - searching for older flushes (%d left)\n", - cur.oid.inode, cur.oid.stripe, cur.version, bs->journal.dirty_start, search_left); -#endif - while (search_left > 0) + if (!flusher->try_find_other(dirty_end, cur)) { - cur.oid = flusher->flush_queue.front(); - cur.version = flusher->flush_versions[cur.oid]; - flusher->flush_queue.pop_front(); - flusher->flush_versions.erase(cur.oid); - dirty_end = bs->dirty_db.find(cur); - if (dirty_end != bs->dirty_db.end()) - { - if (dirty_end->second.journal_sector >= bs->journal.dirty_start && - (bs->journal.dirty_start >= bs->journal.used_start || - dirty_end->second.journal_sector < bs->journal.used_start)) - { -#ifdef BLOCKSTORE_DEBUG - printf("Write %lx:%lx v%lu is too new: offset=%08lx\n", cur.oid.inode, cur.oid.stripe, cur.version, dirty_end->second.journal_sector); -#endif - flusher->enqueue_flush(cur); - } - else - { - repeat_it = flusher->sync_to_repeat.find(cur.oid); - if (repeat_it != flusher->sync_to_repeat.end()) - { - if (repeat_it->second < cur.version) - repeat_it->second = cur.version; - } - else - { - flusher->sync_to_repeat[cur.oid] = 0; - break; - } - } - } - search_left--; - } - if (search_left <= 0) - { -#ifdef BLOCKSTORE_DEBUG - printf("No older flushes, stopping\n"); -#endif goto stop_flusher; } } @@ -406,19 +411,15 @@ stop_flusher: printf("Flushing %lx:%lx v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version); #endif flusher->active_flushers++; -resume_1: // Find it in clean_db { auto & clean_db = bs->clean_db_shard(cur.oid); auto clean_it = clean_db.find(cur.oid); + old_clean_ver = (clean_it != clean_db.end() ? clean_it->second.version : 0); old_clean_loc = (clean_it != clean_db.end() ? clean_it->second.location : UINT64_MAX); } - // Scan dirty versions of the object - if (!scan_dirty(1)) - { - wait_state += 1; - return false; - } + // Scan dirty versions of the object to determine what we need to read + scan_dirty(); // Writes and deletes shouldn't happen at the same time assert(!has_writes || !has_delete); if (!has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX) @@ -444,117 +445,70 @@ resume_1: clean_loc = old_clean_loc; } } - // Also we need to submit metadata read(s). We do read-modify-write cycle(s) for every operation. - resume_2: - if (!modify_meta_read(clean_loc, meta_new, 2)) - { - wait_state += 2; + // Submit dirty data and old checksum data reads +resume_1: +resume_2: + if (!read_dirty(1)) return false; - } - if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) - { - resume_14: - if (!modify_meta_read(old_clean_loc, meta_old, 14)) - { - wait_state += 14; - return false; - } - } - else - meta_old.submitted = false; + // Also we may need to read metadata. We do read-modify-write cycle(s) for every operation. resume_3: - if (wait_count > 0) - { - wait_state = 3; + resume_4: + if (!modify_meta_do_reads(3)) return false; - } - if (meta_new.submitted) + // Now, if csum_block_size is > bitmap_granularity and if we are doing partial checksum block updates, + // perform a trick: clear bitmap bits in the metadata entry and recalculate block checksum with zeros + // in place of overwritten parts. Then, even if the actual partial update fully or partially fails, + // we'll have a correct checksum because it won't include overwritten parts! + // The same thing actually happens even when csum_block_size == bitmap_granularity, but in that case + // we never need to read (and thus verify) overwritten parts from the data device. + resume_5: + resume_6: + resume_7: + resume_8: + resume_9: + resume_10: + resume_11: + if (fill_incomplete && !clear_incomplete_csum_block_bits(5)) + return false; + // Wait for journal data reads if the journal is not inmemory + resume_12: + if (copy_count && !bs->journal.inmemory && wait_journal_count > 0) { - meta_new.it->second.state = 1; - bs->ringloop->wakeup(); - } - if (meta_old.submitted) - { - meta_old.it->second.state = 1; - bs->ringloop->wakeup(); - } - // Reads completed, submit writes and set bitmap bits - if (bs->dsk.clean_entry_bitmap_size || bs->dsk.csum_block_size) - { - new_clean_bitmap = (bs->inmemory_meta - ? (uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size + sizeof(clean_disk_entry) - : (uint8_t*)bs->clean_dyn_data + (clean_loc >> bs->dsk.block_order)*bs->dsk.clean_dyn_size); - if (bs->dsk.clean_entry_bitmap_size && clean_init_bitmap) - { - // Initial internal bitmap bits from the big write - memset(new_clean_bitmap, 0, bs->dsk.clean_entry_bitmap_size); - bitmap_set(new_clean_bitmap, clean_bitmap_offset, clean_bitmap_len, bs->dsk.bitmap_granularity); - } - } - // Copy initial (big_write) data checksums - if (bs->dsk.csum_block_size && clean_init_bitmap) - { - uint8_t *new_clean_data_csum = (uint8_t*)new_clean_bitmap + 2*bs->dsk.clean_entry_bitmap_size + - clean_bitmap_offset / bs->dsk.csum_block_size * (bs->dsk.data_csum_type & 0xFF); - uint64_t dyn_size = bs->dsk.dirty_dyn_size(clean_bitmap_len); - uint8_t *dyn_ptr = dyn_size > sizeof(void*) ? clean_init_data_csum : (uint8_t*)&clean_init_data_csum; - uint64_t csum_len = clean_bitmap_len / bs->dsk.csum_block_size * (bs->dsk.data_csum_type & 0xFF); - memcpy(new_clean_data_csum, dyn_ptr + bs->dsk.clean_entry_bitmap_size, csum_len); + wait_state = wait_base+12; + goto resume_12; } + // Submit data writes for (it = v.begin(); it != v.end(); it++) { - // Set internal bitmap bits - if (new_clean_bitmap) + if (it->copy_flags == COPY_BUF_JOURNAL || it->copy_flags == (COPY_BUF_JOURNAL|COPY_BUF_COALESCED)) { - bitmap_set(new_clean_bitmap, it->offset, it->len, bs->dsk.bitmap_granularity); + await_sqe(13); + data->iov = (struct iovec){ it->buf, (size_t)it->len }; + data->callback = simple_callback_w; + my_uring_prep_writev( + sqe, bs->dsk.data_fd, &data->iov, 1, bs->dsk.data_offset + clean_loc + it->offset + ); + wait_count++; } - // Copy small_write data checksums - if (bs->dsk.csum_block_size) - { - uint8_t *new_clean_data_csum = (uint8_t*)new_clean_bitmap + 2*bs->dsk.clean_entry_bitmap_size + - it->offset / bs->dsk.csum_block_size * (bs->dsk.data_csum_type & 0xFF); - uint64_t csum_len = it->len / bs->dsk.csum_block_size * (bs->dsk.data_csum_type & 0xFF); - memcpy(new_clean_data_csum, it->csum_buf, csum_len); - } - await_sqe(4); - data->iov = (struct iovec){ it->buf, (size_t)it->len }; - data->callback = simple_callback_w; - my_uring_prep_writev( - sqe, bs->dsk.data_fd, &data->iov, 1, bs->dsk.data_offset + clean_loc + it->offset - ); - wait_count++; } - // Wait for data writes before fsyncing it - resume_22: - if (wait_count > 0) - { - wait_state = 22; + // Wait for data writes and metadata reads + resume_14: + resume_15: + if (!wait_meta_reads(14)) return false; - } // Sync data before writing metadata resume_16: resume_17: resume_18: if (copy_count && !fsync_batch(false, 16)) - { - wait_state += 16; return false; - } - resume_5: - // Submit metadata writes, but only when data is written and fsynced - if (!bs->inmemory_meta && meta_new.it->second.state == 0 || wait_count > 0) - { - // metadata sector is still being read or data is still being written, wait for it - wait_state = 5; - return false; - } + // Modify the new metadata entry + update_metadata_entry(); + // Update clean_db - it must be equal to the metadata entry + update_clean_db(); + // And write metadata entries if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) { - if (!bs->inmemory_meta && meta_old.it->second.state == 0) - { - wait_state = 5; - return false; - } // zero out old metadata entry { clean_disk_entry *old_entry = (clean_disk_entry*)((uint8_t*)meta_old.buf + meta_old.pos*bs->dsk.clean_entry_size); @@ -567,111 +521,31 @@ resume_1: } } memset((uint8_t*)meta_old.buf + meta_old.pos*bs->dsk.clean_entry_size, 0, bs->dsk.clean_entry_size); - if (meta_old.sector != meta_new.sector) - { - await_sqe(15); - data->iov = (struct iovec){ meta_old.buf, bs->dsk.meta_block_size }; - data->callback = simple_callback_w; - my_uring_prep_writev( - sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + bs->dsk.meta_block_size + meta_old.sector - ); - wait_count++; - } + resume_19: + if (meta_old.sector != meta_new.sector && !write_meta_block(meta_old, 19)) + return false; } - if (has_delete) - { - clean_disk_entry *new_entry = (clean_disk_entry*)((uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size); - if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid) - { - printf("Fatal error (metadata corruption or bug): tried to delete metadata entry %lu (%lx:%lx v%lu) while deleting %lx:%lx\n", - clean_loc >> bs->dsk.block_order, new_entry->oid.inode, new_entry->oid.stripe, - new_entry->version, cur.oid.inode, cur.oid.stripe); - exit(1); - } - // zero out new metadata entry - memset((uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size, 0, bs->dsk.clean_entry_size); - } - else - { - clean_disk_entry *new_entry = (clean_disk_entry*)((uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size); - if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid) - { - printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx v%lu) with %lx:%lx v%lu\n", - clean_loc >> bs->dsk.block_order, new_entry->oid.inode, new_entry->oid.stripe, new_entry->version, - cur.oid.inode, cur.oid.stripe, cur.version); - exit(1); - } - new_entry->oid = cur.oid; - new_entry->version = cur.version; - if (!bs->inmemory_meta) - { - memcpy(&new_entry->bitmap, new_clean_bitmap, bs->dsk.clean_entry_bitmap_size); - } - // copy latest external bitmap/attributes - if (bs->dsk.clean_entry_bitmap_size) - { - uint64_t dyn_size = bs->dsk.dirty_dyn_size(dirty_end->second.len); - void *dyn_ptr = dyn_size > sizeof(void*) ? dirty_end->second.dyn_data : &dirty_end->second.dyn_data; - // copy bitmap - memcpy((uint8_t*)(new_entry+1) + bs->dsk.clean_entry_bitmap_size, dyn_ptr, bs->dsk.clean_entry_bitmap_size); - } - // calculate metadata entry checksum - uint32_t *new_entry_csum = (uint32_t*)((uint8_t*)new_entry + bs->dsk.clean_entry_size - 4); - *new_entry_csum = crc32c(0, new_entry, bs->dsk.clean_entry_size - 4); - } - await_sqe(6); - data->iov = (struct iovec){ meta_new.buf, bs->dsk.meta_block_size }; - data->callback = simple_callback_w; - my_uring_prep_writev( - sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + bs->dsk.meta_block_size + meta_new.sector - ); - wait_count++; - resume_7: + resume_20: + if (!write_meta_block(meta_new, 20)) + return false; + resume_21: if (wait_count > 0) { - wait_state = 7; + wait_state = wait_base+21; return false; } // Done, free all buffers - if (!bs->inmemory_meta) - { - meta_new.it->second.usage_count--; - if (meta_new.it->second.usage_count == 0) - { - free(meta_new.it->second.buf); - flusher->meta_sectors.erase(meta_new.it); - } - if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) - { - meta_old.it->second.usage_count--; - if (meta_old.it->second.usage_count == 0) - { - free(meta_old.it->second.buf); - flusher->meta_sectors.erase(meta_old.it); - } - } - } - for (it = v.begin(); it != v.end(); it++) - { - // Free it if it's not taken from the journal - if (it->buf && (!bs->journal.inmemory || it->buf < bs->journal.buffer || - it->buf >= (uint8_t*)bs->journal.buffer + bs->journal.len)) - { - free(it->buf); - } - } - v.clear(); + free_buffers(); // And sync metadata (in batches - not per each operation!) - resume_8: - resume_9: - resume_10: - if (!fsync_batch(true, 8)) - { - wait_state += 8; + resume_22: + resume_23: + resume_24: + if (!fsync_batch(true, 22)) return false; - } - // Update clean_db and dirty_db, free old data locations - update_clean_db(); + // Free the data block only when metadata is synced + free_data_blocks(); + // Erase dirty_db entries + bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc); #ifdef BLOCKSTORE_DEBUG printf("Flushed %lx:%lx v%lu (%d copies, wr:%d, del:%d), %ld left\n", cur.oid.inode, cur.oid.stripe, cur.version, copy_count, has_writes, has_delete, flusher->flush_queue.size()); @@ -688,78 +562,13 @@ resume_1: // Clear unused part of the journal every flushes if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0) { - flusher->journal_trim_counter = 0; - new_trim_pos = bs->journal.get_trim_pos(); - if (new_trim_pos != bs->journal.used_start) - { - resume_19: - // Wait for other coroutines trimming the journal, if any - if (flusher->trimming) - { - wait_state = 19; - return false; - } - flusher->trimming = true; - // Recheck the position with the "lock" taken - new_trim_pos = bs->journal.get_trim_pos(); - if (new_trim_pos != bs->journal.used_start) - { - // First update journal "superblock" and only then update in memory - await_sqe(12); - *((journal_entry_start*)flusher->journal_superblock) = { - .crc32 = 0, - .magic = JOURNAL_MAGIC, - .type = JE_START, - .size = sizeof(journal_entry_start), - .reserved = 0, - .journal_start = new_trim_pos, - .version = JOURNAL_VERSION_V2, - .data_csum_type = bs->dsk.data_csum_type, - .csum_block_size = bs->dsk.csum_block_size, - }; - ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock); - data->iov = (struct iovec){ flusher->journal_superblock, bs->dsk.journal_block_size }; - data->callback = simple_callback_w; - my_uring_prep_writev(sqe, bs->dsk.journal_fd, &data->iov, 1, bs->journal.offset); - wait_count++; - resume_13: - if (wait_count > 0) - { - wait_state = 13; - return false; - } - if (!bs->disable_journal_fsync) - { - await_sqe(20); - my_uring_prep_fsync(sqe, bs->dsk.journal_fd, IORING_FSYNC_DATASYNC); - data->iov = { 0 }; - data->callback = simple_callback_w; - resume_21: - if (wait_count > 0) - { - wait_state = 21; - return false; - } - } - if (new_trim_pos < bs->journal.used_start - ? (bs->journal.dirty_start >= bs->journal.used_start || bs->journal.dirty_start < new_trim_pos) - : (bs->journal.dirty_start >= bs->journal.used_start && bs->journal.dirty_start < new_trim_pos)) - { - bs->journal.dirty_start = new_trim_pos; - } - bs->journal.used_start = new_trim_pos; -#ifdef BLOCKSTORE_DEBUG - printf("Journal trimmed to %08lx (next_free=%08lx dirty_start=%08lx)\n", bs->journal.used_start, bs->journal.next_free, bs->journal.dirty_start); -#endif - if (bs->journal.flush_journal && !flusher->flush_queue.size()) - { - assert(bs->journal.used_start == bs->journal.next_free); - printf("Journal flushed\n"); - exit(0); - } - } - flusher->trimming = false; - } + resume_25: + resume_26: + resume_27: + resume_28: + resume_29: + if (!trim_journal(25)) + return false; } // All done flusher->active_flushers--; @@ -769,21 +578,325 @@ resume_1: return true; } -bool journal_flusher_co::scan_dirty(int wait_base) +void journal_flusher_co::update_metadata_entry() +{ + clean_disk_entry *new_entry = (clean_disk_entry*)((uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size); + if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid) + { + printf( + has_delete + ? "Fatal error (metadata corruption or bug): tried to delete metadata entry %lu (%lx:%lx v%lu) while deleting %lx:%lx v%lu\n" + : "Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx v%lu) with %lx:%lx v%lu\n", + clean_loc >> bs->dsk.block_order, new_entry->oid.inode, new_entry->oid.stripe, + new_entry->version, cur.oid.inode, cur.oid.stripe, cur.version + ); + exit(1); + } + if (has_delete) + { + // Zero out the new metadata entry + memset((uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size, 0, bs->dsk.clean_entry_size); + } + else + { + if (bs->dsk.csum_block_size) + { + // Copy the whole old metadata entry before updating it if the updated object is being read + obj_ver_id ov = { .oid = new_entry->oid, .version = new_entry->version }; + auto uo_it = bs->used_clean_objects.upper_bound(ov); + if (uo_it != bs->used_clean_objects.begin()) + { + uo_it--; + if (uo_it->first.oid == new_entry->oid) + { + uint8_t *meta_copy = (uint8_t*)malloc_or_die(bs->dsk.clean_entry_size); + memcpy(meta_copy, new_entry, bs->dsk.clean_entry_size); + // The reads should free all metadata entry backups when they don't need them anymore + if (uo_it->first.version < new_entry->version) + { + // If ==, write in place + uo_it++; + bs->used_clean_objects.insert(uo_it, std::make_pair(ov, (used_clean_obj_t){ + .meta = meta_copy, + })); + } + else + { + uo_it->second.meta = meta_copy; + } + } + } + } + // Set initial internal bitmap bits from the big write + if (clean_init_bitmap) + { + memset(new_clean_bitmap, 0, bs->dsk.clean_entry_bitmap_size); + bitmap_set(new_clean_bitmap, clean_bitmap_offset, clean_bitmap_len, bs->dsk.bitmap_granularity); + } + for (auto it = v.begin(); it != v.end(); it++) + { + // Set internal bitmap bits from small writes + if (it->copy_flags == COPY_BUF_JOURNAL || it->copy_flags == (COPY_BUF_JOURNAL|COPY_BUF_COALESCED)) + bitmap_set(new_clean_bitmap, it->offset, it->len, bs->dsk.bitmap_granularity); + } + // Copy latest external bitmap/attributes + { + uint64_t dyn_size = bs->dsk.dirty_dyn_size(dirty_end->second.offset, dirty_end->second.len); + void *dyn_ptr = dyn_size > sizeof(void*) ? dirty_end->second.dyn_data : &dirty_end->second.dyn_data; + memcpy(new_clean_bitmap + bs->dsk.clean_entry_bitmap_size, dyn_ptr, bs->dsk.clean_entry_bitmap_size); + } + // Copy initial (big_write) data checksums + if (bs->dsk.csum_block_size && clean_init_bitmap) + { + uint8_t *new_clean_data_csum = new_clean_bitmap + 2*bs->dsk.clean_entry_bitmap_size + + clean_bitmap_offset / bs->dsk.csum_block_size * (bs->dsk.data_csum_type & 0xFF); + // big_write partial checksums are calculated from a padded csum_block_size, we can just copy them + memset(new_clean_data_csum, 0, bs->dsk.data_block_size / bs->dsk.csum_block_size * (bs->dsk.data_csum_type & 0xFF)); + uint64_t dyn_size = bs->dsk.dirty_dyn_size(clean_bitmap_offset, clean_bitmap_len); + uint32_t *csums = (uint32_t*)(clean_init_dyn_ptr + bs->dsk.clean_entry_bitmap_size); + memcpy(new_clean_data_csum, csums, dyn_size - bs->dsk.clean_entry_bitmap_size); + } + // Calculate or copy small_write checksums + uint32_t *new_data_csums = (uint32_t*)(new_clean_bitmap + 2*bs->dsk.clean_entry_bitmap_size); + if (bs->dsk.csum_block_size) + calc_block_checksums(new_data_csums, false); + // Update entry + new_entry->oid = cur.oid; + new_entry->version = cur.version; + if (!bs->inmemory_meta) + memcpy(&new_entry->bitmap, new_clean_bitmap, bs->dsk.clean_dyn_size); + // Calculate metadata entry checksum + uint32_t *new_entry_csum = (uint32_t*)((uint8_t*)new_entry + bs->dsk.clean_entry_size - 4); + *new_entry_csum = crc32c(0, new_entry, bs->dsk.clean_entry_size - 4); + } +} + +void journal_flusher_co::free_buffers() +{ + if (!bs->inmemory_meta) + { + meta_new.it->second.usage_count--; + if (meta_new.it->second.usage_count == 0) + { + free(meta_new.it->second.buf); + flusher->meta_sectors.erase(meta_new.it); + } + if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) + { + meta_old.it->second.usage_count--; + if (meta_old.it->second.usage_count == 0) + { + free(meta_old.it->second.buf); + flusher->meta_sectors.erase(meta_old.it); + } + } + } + for (auto it = v.begin(); it != v.end(); it++) + { + // Free it if it's not taken from the journal + if (it->buf && (it->copy_flags == COPY_BUF_JOURNAL || (it->copy_flags & COPY_BUF_CSUM_FILL)) && + (!bs->journal.inmemory || it->buf < bs->journal.buffer || it->buf >= (uint8_t*)bs->journal.buffer + bs->journal.len)) + { + free(it->buf); + } + } + v.clear(); +} + +bool journal_flusher_co::write_meta_block(flusher_meta_write_t & meta_block, int wait_base) { if (wait_state == wait_base) - { goto resume_0; + await_sqe(0); + data->iov = (struct iovec){ meta_block.buf, bs->dsk.meta_block_size }; + data->callback = simple_callback_w; + my_uring_prep_writev( + sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + bs->dsk.meta_block_size + meta_block.sector + ); + wait_count++; + return true; +} + +bool journal_flusher_co::clear_incomplete_csum_block_bits(int wait_base) +{ + if (wait_state == wait_base) goto resume_0; + else if (wait_state == wait_base+1) goto resume_1; + else if (wait_state == wait_base+2) goto resume_2; + else if (wait_state == wait_base+3) goto resume_3; + else if (wait_state == wait_base+4) goto resume_4; + else if (wait_state == wait_base+5) goto resume_5; + else if (wait_state == wait_base+6) goto resume_6; + cleared_incomplete = false; + for (auto it = v.begin(); it != v.end(); it++) + { + if ((it->copy_flags == COPY_BUF_JOURNAL || it->copy_flags == (COPY_BUF_JOURNAL|COPY_BUF_COALESCED)) && + bitmap_check(new_clean_bitmap, it->offset, it->len, bs->dsk.bitmap_granularity)) + { + cleared_incomplete = true; + break; + } } + if (cleared_incomplete) + { + // This modification may only happen in place + assert(old_clean_loc == clean_loc); + // Wait for data writes and metadata reads + resume_0: + resume_1: + if (!wait_meta_reads(wait_base+0)) + return false; + // Verify data checksums + for (i = v.size()-1; i >= 0 && (v[i].copy_flags & COPY_BUF_CSUM_FILL); i--) + { + // If we encounter bad checksums during flush, we still update the bad block, + // but intentionally mangle checksums to avoid hiding the corruption. + iovec iov = { .iov_base = v[i].buf, .iov_len = v[i].len }; + if (!(v[i].copy_flags & COPY_BUF_JOURNAL)) + { + assert(!(v[i].offset % bs->dsk.csum_block_size)); + assert(!(v[i].len % bs->dsk.csum_block_size)); + bs->verify_padded_checksums(new_clean_bitmap, v[i].offset, &iov, 1, [&](uint32_t bad_block, uint32_t calc_csum, uint32_t stored_csum) + { + printf("Checksum mismatch in object %lx:%lx v%lu in data area at offset 0x%lx+0x%x: got %08x, expected %08x\n", + cur.oid.inode, cur.oid.stripe, old_clean_ver, old_clean_loc, bad_block, calc_csum, stored_csum); + for (uint32_t j = 0; j < bs->dsk.csum_block_size; j += bs->dsk.bitmap_granularity) + { + // Simplest method of mangling: flip one byte in every sector + ((uint8_t*)v[i].buf)[j+bad_block-v[i].offset] ^= 0xff; + } + }); + } + else + { + bs->verify_journal_checksums(v[i].csum_buf, v[i].offset, &iov, 1, [&](uint32_t bad_block, uint32_t calc_csum, uint32_t stored_csum) + { + printf("Checksum mismatch in object %lx:%lx v%lu in journal at offset 0x%lx+0x%x: got %08x, expected %08x\n", + cur.oid.inode, cur.oid.stripe, old_clean_ver, + v[i].disk_offset, bad_block, calc_csum, stored_csum); + uint32_t bad_block_end = ((bad_block/bs->dsk.csum_block_size)+1)*bs->dsk.csum_block_size - + v[i].offset % bs->dsk.csum_block_size; + if (bad_block_end > v[i].offset+v[i].len) + bad_block_end = v[i].offset+v[i].len; + for (uint32_t j = bad_block; j < bad_block_end; j += bs->dsk.bitmap_granularity) + { + // Simplest method of mangling: flip one byte in every sector + ((uint8_t*)v[i].buf)[j] ^= 0xff; + } + }); + } + } + // Actually clear bits + for (auto it = v.begin(); it != v.end(); it++) + { + if (it->copy_flags == COPY_BUF_JOURNAL || it->copy_flags == (COPY_BUF_JOURNAL|COPY_BUF_COALESCED)) + bitmap_clear(new_clean_bitmap, it->offset, it->len, bs->dsk.bitmap_granularity); + } + { + clean_disk_entry *new_entry = (clean_disk_entry*)((uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size); + if (new_entry->oid != cur.oid) + { + printf( + "Fatal error (metadata corruption or bug): tried to make holes in %lu (%lx:%lx v%lu) with %lx:%lx v%lu\n", + clean_loc >> bs->dsk.block_order, new_entry->oid.inode, new_entry->oid.stripe, + new_entry->version, cur.oid.inode, cur.oid.stripe, cur.version + ); + } + assert(new_entry->oid == cur.oid); + // Calculate block checksums with new holes + uint32_t *new_data_csums = (uint32_t*)(new_clean_bitmap + 2*bs->dsk.clean_entry_bitmap_size); + calc_block_checksums(new_data_csums, true); + if (!bs->inmemory_meta) + memcpy(&new_entry->bitmap, new_clean_bitmap, bs->dsk.clean_dyn_size); + // calculate metadata entry checksum + uint32_t *new_entry_csum = (uint32_t*)((uint8_t*)new_entry + bs->dsk.clean_entry_size - 4); + *new_entry_csum = crc32c(0, new_entry, bs->dsk.clean_entry_size - 4); + } + // Write and fsync the modified metadata entry + resume_2: + if (!write_meta_block(meta_new, wait_base+2)) + return false; + resume_3: + if (wait_count > 0) + { + wait_state = wait_base+3; + return false; + } + resume_4: + resume_5: + resume_6: + if (!fsync_batch(true, wait_base+4)) + return false; + } + return true; +} + +void journal_flusher_co::calc_block_checksums(uint32_t *new_data_csums, bool skip_overwrites) +{ + uint64_t block_offset = 0; + uint32_t block_done = 0; + uint32_t block_csum = 0; + for (auto it = v.begin(); it != v.end(); it++) + { + if (it->copy_flags & COPY_BUF_CSUM_FILL) + break; + if (block_done == 0) + { + // `v` should contain aligned items, possibly split into pieces + assert(!(it->offset % bs->dsk.csum_block_size)); + block_offset = it->offset; + } + bool zero = (it->copy_flags & COPY_BUF_ZERO) || (skip_overwrites && (it->copy_flags & COPY_BUF_JOURNAL)); + auto len = it->len; + while ((block_done+len) >= bs->dsk.csum_block_size) + { + if (!skip_overwrites && !block_done && it->csum_buf) + { + // We may take existing checksums if an overwrite contains a full block + auto full_csum_offset = (it->offset+it->len-len+bs->dsk.csum_block_size-1) / bs->dsk.csum_block_size + - it->offset / bs->dsk.csum_block_size; + auto full_csum_count = len/bs->dsk.csum_block_size; + memcpy(new_data_csums + block_offset/bs->dsk.csum_block_size, + it->csum_buf + full_csum_offset*4, full_csum_count*4); + len -= full_csum_count*bs->dsk.csum_block_size; + block_offset += full_csum_count*bs->dsk.csum_block_size; + } + else + { + auto cur_len = bs->dsk.csum_block_size-block_done; + block_csum = zero + ? crc32c_pad(block_csum, NULL, 0, cur_len, 0) + : crc32c(block_csum, (uint8_t*)it->buf+(it->len-len), cur_len); + new_data_csums[block_offset / bs->dsk.csum_block_size] = block_csum; + block_csum = 0; + block_done = 0; + block_offset += bs->dsk.csum_block_size; + len -= cur_len; + } + } + if (len > 0) + { + block_csum = zero + ? crc32c_pad(block_csum, NULL, 0, len, 0) + : crc32c(block_csum, (uint8_t*)it->buf+(it->len-len), len); + block_done += len; + } + } + // `v` should contain aligned items, possibly split into pieces + assert(!block_done); +} + +void journal_flusher_co::scan_dirty() +{ dirty_it = dirty_start = dirty_end; v.clear(); - wait_count = 0; copy_count = 0; clean_loc = UINT64_MAX; has_delete = false; has_writes = false; skip_copy = false; clean_init_bitmap = false; + fill_incomplete = false; + read_to_fill_incomplete = 0; while (1) { if (!IS_STABLE(dirty_it->second.state)) @@ -797,58 +910,91 @@ bool journal_flusher_co::scan_dirty(int wait_base) } else if (IS_JOURNAL(dirty_it->second.state) && !skip_copy) { - // First we submit all reads + // Partial dirty overwrite has_writes = true; if (dirty_it->second.len != 0) { - offset = dirty_it->second.offset; - end_offset = dirty_it->second.offset + dirty_it->second.len; - it = v.begin(); + uint64_t offset = dirty_it->second.offset; + uint64_t end_offset = dirty_it->second.offset + dirty_it->second.len; + uint64_t blk_begin = 0, blk_end = 0; + uint8_t *blk_buf = NULL; + auto it = v.begin(); while (end_offset > offset) { - for (; it != v.end(); it++) + for (; it != v.end() && !(it->copy_flags & COPY_BUF_CSUM_FILL); it++) if (it->offset+it->len > offset) break; // If all items end before offset or if the found item starts after end_offset, just insert the buffer // If (offset < it->offset < end_offset) insert (offset..it->offset) part // If (it->offset <= offset <= it->offset+it->len) then just skip to it->offset+it->len - if (it == v.end() || it->offset > offset) + if (it == v.end() || (it->copy_flags & COPY_BUF_CSUM_FILL) || it->offset > offset) { - submit_offset = dirty_it->second.location + offset - dirty_it->second.offset; - submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset; - it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len }); + uint64_t submit_len = it == v.end() || (it->copy_flags & COPY_BUF_CSUM_FILL) || + it->offset >= end_offset ? end_offset-offset : it->offset-offset; + uint64_t submit_offset = dirty_it->second.location + offset - dirty_it->second.offset; copy_count++; - if (bs->dsk.csum_block_size) - { - uint64_t dyn_size = bs->dsk.dirty_dyn_size(dirty_it->second.len); - // FIXME Remove this > sizeof(void*) inline perversion from everywhere. - // I think it doesn't matter but I couldn't stop myself from implementing it :) - uint8_t *dyn_from = (uint8_t*)(dyn_size > sizeof(void*) ? dirty_it->second.dyn_data : &dirty_it->second.dyn_data) + - bs->dsk.clean_entry_bitmap_size + - (offset - dirty_it->second.offset) / bs->dsk.csum_block_size * (bs->dsk.data_csum_type & 0xFF); - it->csum_buf = dyn_from; - } + it = v.insert(it, (copy_buffer_t){ + .copy_flags = COPY_BUF_JOURNAL, + .offset = offset, + .len = submit_len, + .disk_offset = submit_offset, + }); if (bs->journal.inmemory) { // Take it from memory, don't copy it it->buf = (uint8_t*)bs->journal.buffer + submit_offset; } - else + if (bs->dsk.csum_block_size) { - // Read it from disk - it->buf = memalign_or_die(MEM_ALIGNMENT, submit_len); - await_sqe(0); - data->iov = (struct iovec){ it->buf, (size_t)submit_len }; - data->callback = simple_callback_r; - my_uring_prep_readv( - sqe, bs->dsk.journal_fd, &data->iov, 1, bs->journal.offset + submit_offset - ); - wait_count++; + if (offset % bs->dsk.csum_block_size || submit_len % bs->dsk.csum_block_size) + { + // Small write not aligned for checksums. We may have to pad it + fill_incomplete = true; + } + // FIXME Remove this > sizeof(void*) inline perversion from everywhere. + // I think it doesn't matter but I couldn't stop myself from implementing it :) + uint64_t dyn_size = bs->dsk.dirty_dyn_size(dirty_it->second.offset, dirty_it->second.len); + uint8_t* dyn_from = (uint8_t*)(dyn_size > sizeof(void*) ? dirty_it->second.dyn_data : &dirty_it->second.dyn_data) + + bs->dsk.clean_entry_bitmap_size; + it->csum_buf = dyn_from + (it->offset/bs->dsk.csum_block_size - + dirty_it->second.offset/bs->dsk.csum_block_size) * (bs->dsk.data_csum_type & 0xFF); + if (!bs->journal.inmemory) + { + if (offset < blk_end) + { + // Already being read as a part of the previous checksum block series + it->buf = blk_buf + offset - blk_begin; + it->copy_flags |= COPY_BUF_COALESCED; + if (offset+submit_len > blk_end) + it->len = blk_end-offset; + } + else if (offset % bs->dsk.csum_block_size || submit_len % bs->dsk.csum_block_size) + { + // We don't use fill_partial_checksum_blocks for journal because journal writes never have holes (internal bitmap) + blk_begin = (offset/bs->dsk.csum_block_size) * bs->dsk.csum_block_size; + blk_begin = blk_begin < dirty_it->second.offset ? dirty_it->second.offset : blk_begin; + blk_end = ((offset+submit_len-1)/bs->dsk.csum_block_size + 1) * bs->dsk.csum_block_size; + blk_end = blk_end > end_offset ? end_offset : blk_end; + if (blk_begin < offset || blk_end > offset+submit_len) + { + blk_buf = (uint8_t*)memalign_or_die(MEM_ALIGNMENT, blk_end-blk_begin); + it->buf = blk_buf + offset - blk_begin; + it->copy_flags |= COPY_BUF_COALESCED; + v.push_back((copy_buffer_t){ + .copy_flags = COPY_BUF_JOURNAL|COPY_BUF_CSUM_FILL, + .offset = blk_begin, + .len = blk_end-blk_begin, + .disk_offset = dirty_it->second.location + blk_begin - dirty_it->second.offset, + .buf = blk_buf, + .csum_buf = (dyn_from + (blk_begin/bs->dsk.csum_block_size - + dirty_it->second.offset/bs->dsk.csum_block_size) * (bs->dsk.data_csum_type & 0xFF)), + }); + } + } + } } } offset = it->offset+it->len; - if (it == v.end()) - break; } } } @@ -860,7 +1006,8 @@ bool journal_flusher_co::scan_dirty(int wait_base) clean_init_bitmap = true; clean_bitmap_offset = dirty_it->second.offset; clean_bitmap_len = dirty_it->second.len; - clean_init_data_csum = (uint8_t*)dirty_it->second.dyn_data; + clean_init_dyn_ptr = bs->dsk.dirty_dyn_size(clean_bitmap_offset, clean_bitmap_len) > sizeof(void*) + ? (uint8_t*)dirty_it->second.dyn_data : (uint8_t*)&dirty_it->second.dyn_data; skip_copy = true; } else if (IS_DELETE(dirty_it->second.state) && !skip_copy) @@ -880,15 +1027,159 @@ bool journal_flusher_co::scan_dirty(int wait_base) break; } } + if (fill_incomplete && !clean_init_bitmap) + { + // Rescan and fill incomplete writes with old data to calculate checksums + uint8_t *bmp_ptr = bs->get_clean_entry_bitmap(old_clean_loc, 0); + uint64_t fulfilled = 0; + read_to_fill_incomplete = bs->fill_partial_checksum_blocks( + v, fulfilled, bmp_ptr, NULL, v[0].offset/bs->dsk.csum_block_size * bs->dsk.csum_block_size, + ((v[v.size()-1].offset+v[v.size()-1].len-1) / bs->dsk.csum_block_size + 1) * bs->dsk.csum_block_size + ); + } + else if (fill_incomplete && clean_init_bitmap) + { + // If we actually have partial checksum block overwrites AND a new clean_loc + // at the same time then we can't use our fancy checksum block mutation algorithm. + // So in this case we'll have to first flush the clean write separately. + while (!IS_BIG_WRITE(dirty_end->second.state)) + { + assert(dirty_end != bs->dirty_db.begin()); + dirty_end--; + } + flusher->enqueue_flush(cur); + cur.version = dirty_end->first.version; +#ifdef BLOCKSTORE_DEBUG + printf("Partial checksum block overwrites found - rewinding flush back to %lx:%lx v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version); +#endif + v.clear(); + copy_count = 0; + fill_incomplete = false; + read_to_fill_incomplete = 0; + } +} + +bool journal_flusher_co::read_dirty(int wait_base) +{ + if (wait_state == wait_base) goto resume_0; + else if (wait_state == wait_base+1) goto resume_1; + wait_count = 0; + if (bs->journal.inmemory && !read_to_fill_incomplete) + { + // Happy path: nothing to read :) + return true; + } + for (i = 1; i <= v.size() && (v[v.size()-i].copy_flags & COPY_BUF_CSUM_FILL); i++) + { + if (v[v.size()-i].copy_flags & COPY_BUF_JOURNAL) + continue; + // Read old data from disk to calculate checksums + await_sqe(0); + auto & vi = v[v.size()-i]; + assert(vi.len != 0); + vi.buf = memalign_or_die(MEM_ALIGNMENT, vi.len); + data->iov = (struct iovec){ vi.buf, vi.len }; + data->callback = simple_callback_r; + my_uring_prep_readv( + sqe, bs->dsk.data_fd, &data->iov, 1, bs->dsk.data_offset + old_clean_loc + vi.offset + ); + wait_count++; + bs->find_holes(v, vi.offset, vi.offset+vi.len, [this, buf = (uint8_t*)vi.buf-vi.offset](int pos, bool alloc, uint32_t cur_start, uint32_t cur_end) + { + if (!alloc) + { + v.insert(v.begin()+pos, (copy_buffer_t){ + .copy_flags = COPY_BUF_DATA, + .offset = cur_start, + .len = cur_end-cur_start, + .buf = buf+cur_start, + }); + return 1; + } + return 0; + }); + } + if (!bs->journal.inmemory) + { + for (i = 0; i < v.size(); i++) + { + if (v[i].copy_flags == COPY_BUF_JOURNAL || + v[i].copy_flags == (COPY_BUF_JOURNAL | COPY_BUF_CSUM_FILL)) + { + // Read journal data from disk + if (!v[i].buf) + v[i].buf = memalign_or_die(MEM_ALIGNMENT, v[i].len); + await_sqe(1); + data->iov = (struct iovec){ v[i].buf, (size_t)v[i].len }; + data->callback = simple_callback_rj; + my_uring_prep_readv( + sqe, bs->dsk.journal_fd, &data->iov, 1, bs->journal.offset + v[i].disk_offset + ); + wait_journal_count++; + } + } + } + return true; +} + +bool journal_flusher_co::modify_meta_do_reads(int wait_base) +{ + if (wait_state == wait_base) goto resume_0; + else if (wait_state == wait_base+1) goto resume_1; +resume_0: + if (!modify_meta_read(clean_loc, meta_new, wait_base+0)) + return false; + new_clean_bitmap = (bs->inmemory_meta + ? (uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size + sizeof(clean_disk_entry) + : (uint8_t*)bs->clean_dyn_data + (clean_loc >> bs->dsk.block_order)*bs->dsk.clean_dyn_size); + if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) + { + resume_1: + if (!modify_meta_read(old_clean_loc, meta_old, wait_base+1)) + return false; + } + else + meta_old.submitted = false; + return true; +} + +bool journal_flusher_co::wait_meta_reads(int wait_base) +{ + if (wait_state == wait_base) goto resume_0; + else if (wait_state == wait_base+1) goto resume_1; +resume_0: + if (wait_count > 0) + { + wait_state = wait_base+0; + return false; + } + // Our own reads completed + if (meta_new.submitted) + { + meta_new.it->second.state = META_BLOCK_READ; + bs->ringloop->wakeup(); + } + if (meta_old.submitted) + { + meta_old.it->second.state = META_BLOCK_READ; + bs->ringloop->wakeup(); + } +resume_1: + if (!bs->inmemory_meta && (meta_new.it->second.state == META_BLOCK_UNREAD || + (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) && meta_old.it->second.state == META_BLOCK_UNREAD)) + { + // Metadata block is being read by another coroutine + wait_state = wait_base+1; + return false; + } + // All reads completed return true; } bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base) { if (wait_state == wait_base) - { goto resume_0; - } // We must check if the same sector is already in memory if we don't keep all metadata in memory all the time. // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot, // so I'll avoid it as long as I can. @@ -908,7 +1199,7 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_ wr.it = flusher->meta_sectors.emplace(wr.sector, (meta_sector_t){ .offset = wr.sector, .len = bs->dsk.meta_block_size, - .state = 0, // 0 = not read yet + .state = META_BLOCK_UNREAD, // 0 = not read yet .buf = wr.buf, .usage_count = 1, }).first; @@ -931,28 +1222,10 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_ void journal_flusher_co::update_clean_db() { - if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) - { -#ifdef BLOCKSTORE_DEBUG - printf("Free block %lu from %lx:%lx v%lu (new location is %lu)\n", - old_clean_loc >> bs->dsk.block_order, - cur.oid.inode, cur.oid.stripe, cur.version, - clean_loc >> bs->dsk.block_order); -#endif - bs->data_alloc->set(old_clean_loc >> bs->dsk.block_order, false); - } auto & clean_db = bs->clean_db_shard(cur.oid); if (has_delete) { - auto clean_it = clean_db.find(cur.oid); - clean_db.erase(clean_it); -#ifdef BLOCKSTORE_DEBUG - printf("Free block %lu from %lx:%lx v%lu (delete)\n", - clean_loc >> bs->dsk.block_order, - cur.oid.inode, cur.oid.stripe, cur.version); -#endif - bs->data_alloc->set(clean_loc >> bs->dsk.block_order, false); - clean_loc = UINT64_MAX; + clean_db.erase(cur.oid); } else { @@ -961,17 +1234,49 @@ void journal_flusher_co::update_clean_db() .location = clean_loc, }; } - bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc); +} + +void journal_flusher_co::free_data_blocks() +{ + if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) + { + auto uo_it = bs->used_clean_objects.find((obj_ver_id){ .oid = cur.oid, .version = old_clean_ver }); + bool used = uo_it != bs->used_clean_objects.end(); +#ifdef BLOCKSTORE_DEBUG + printf("%s block %lu from %lx:%lx v%lu (new location is %lu)\n", + used ? "Postpone free" : "Free", + old_clean_loc >> bs->dsk.block_order, + cur.oid.inode, cur.oid.stripe, cur.version, + clean_loc >> bs->dsk.block_order); +#endif + if (used) + uo_it->second.freed_block = 1 + (old_clean_loc >> bs->dsk.block_order); + else + bs->data_alloc->set(old_clean_loc >> bs->dsk.block_order, false); + } + if (has_delete) + { + assert(clean_loc == old_clean_loc); + auto uo_it = bs->used_clean_objects.find((obj_ver_id){ .oid = cur.oid, .version = old_clean_ver }); + bool used = uo_it != bs->used_clean_objects.end(); +#ifdef BLOCKSTORE_DEBUG + printf("%s block %lu from %lx:%lx v%lu (delete)\n", + used ? "Postpone free" : "Free", + clean_loc >> bs->dsk.block_order, + cur.oid.inode, cur.oid.stripe, cur.version); +#endif + if (used) + uo_it->second.freed_block = 1 + (clean_loc >> bs->dsk.block_order); + else + bs->data_alloc->set(clean_loc >> bs->dsk.block_order, false); + } } bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) { - if (wait_state == wait_base) - goto resume_0; - else if (wait_state == wait_base+1) - goto resume_1; - else if (wait_state == wait_base+2) - goto resume_2; + if (wait_state == wait_base) goto resume_0; + else if (wait_state == wait_base+1) goto resume_1; + else if (wait_state == wait_base+2) goto resume_2; if (!(fsync_meta ? bs->disable_meta_fsync : bs->disable_data_fsync)) { cur_sync = flusher->syncs.end(); @@ -1006,7 +1311,7 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) resume_2: if (wait_count > 0) { - wait_state = 2; + wait_state = wait_base+2; return false; } // Sync completed. All previous coroutines waiting for it must be resumed @@ -1016,7 +1321,7 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) else { // Wait until someone else sends and completes a sync. - wait_state = 1; + wait_state = wait_base+1; return false; } } @@ -1029,3 +1334,86 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) } return true; } + +bool journal_flusher_co::trim_journal(int wait_base) +{ + if (wait_state == wait_base) goto resume_0; + else if (wait_state == wait_base+1) goto resume_1; + else if (wait_state == wait_base+2) goto resume_2; + else if (wait_state == wait_base+3) goto resume_3; + else if (wait_state == wait_base+4) goto resume_4; + flusher->journal_trim_counter = 0; + new_trim_pos = bs->journal.get_trim_pos(); + if (new_trim_pos != bs->journal.used_start) + { + resume_0: + // Wait for other coroutines trimming the journal, if any + if (flusher->trimming) + { + wait_state = wait_base+0; + return false; + } + flusher->trimming = true; + // Recheck the position with the "lock" taken + new_trim_pos = bs->journal.get_trim_pos(); + if (new_trim_pos != bs->journal.used_start) + { + // First update journal "superblock" and only then update in memory + await_sqe(1); + *((journal_entry_start*)flusher->journal_superblock) = { + .crc32 = 0, + .magic = JOURNAL_MAGIC, + .type = JE_START, + .size = sizeof(journal_entry_start), + .reserved = 0, + .journal_start = new_trim_pos, + .version = JOURNAL_VERSION_V2, + .data_csum_type = bs->dsk.data_csum_type, + .csum_block_size = bs->dsk.csum_block_size, + }; + ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock); + data->iov = (struct iovec){ flusher->journal_superblock, bs->dsk.journal_block_size }; + data->callback = simple_callback_w; + my_uring_prep_writev(sqe, bs->dsk.journal_fd, &data->iov, 1, bs->journal.offset); + wait_count++; + resume_2: + if (wait_count > 0) + { + wait_state = wait_base+2; + return false; + } + if (!bs->disable_journal_fsync) + { + await_sqe(3); + my_uring_prep_fsync(sqe, bs->dsk.journal_fd, IORING_FSYNC_DATASYNC); + data->iov = { 0 }; + data->callback = simple_callback_w; + wait_count++; + resume_4: + if (wait_count > 0) + { + wait_state = wait_base+4; + return false; + } + } + if (new_trim_pos < bs->journal.used_start + ? (bs->journal.dirty_start >= bs->journal.used_start || bs->journal.dirty_start < new_trim_pos) + : (bs->journal.dirty_start >= bs->journal.used_start && bs->journal.dirty_start < new_trim_pos)) + { + bs->journal.dirty_start = new_trim_pos; + } + bs->journal.used_start = new_trim_pos; +#ifdef BLOCKSTORE_DEBUG + printf("Journal trimmed to %08lx (next_free=%08lx dirty_start=%08lx)\n", bs->journal.used_start, bs->journal.next_free, bs->journal.dirty_start); +#endif + if (bs->journal.flush_journal && !flusher->flush_queue.size()) + { + assert(bs->journal.used_start == bs->journal.next_free); + printf("Journal flushed\n"); + exit(0); + } + } + flusher->trimming = false; + } + return true; +} diff --git a/src/blockstore_flush.h b/src/blockstore_flush.h index b676d01c..aa618a58 100644 --- a/src/blockstore_flush.h +++ b/src/blockstore_flush.h @@ -1,10 +1,18 @@ // Copyright (c) Vitaliy Filippov, 2019+ // License: VNPL-1.1 (see README.md for details) +#define COPY_BUF_JOURNAL 1 +#define COPY_BUF_DATA 2 +#define COPY_BUF_ZERO 4 +#define COPY_BUF_CSUM_FILL 8 +#define COPY_BUF_COALESCED 16 + struct copy_buffer_t { - uint64_t offset, len; - void *buf; + int copy_flags; + uint64_t offset, len, disk_offset; + uint64_t journal_sector; // only for reads: sector+1 if used and !journal.inmemory, otherwise 0 + void *buf; // only for writes: new checksum data uint8_t *csum_buf; }; @@ -38,7 +46,7 @@ class journal_flusher_co { blockstore_impl_t *bs; journal_flusher_t *flusher; - int wait_state, wait_count; + int wait_state, wait_count, wait_journal_count; struct io_uring_sqe *sqe; struct ring_data_t *data; @@ -47,29 +55,39 @@ class journal_flusher_co obj_ver_id cur; std::map::iterator dirty_it, dirty_start, dirty_end; std::map::iterator repeat_it; - std::function simple_callback_r, simple_callback_w; + std::function simple_callback_r, simple_callback_rj, simple_callback_w; bool skip_copy, has_delete, has_writes; std::vector v; std::vector::iterator it; + int i; + bool fill_incomplete, cleared_incomplete; + int read_to_fill_incomplete; int copy_count; - uint64_t clean_loc, old_clean_loc; + uint64_t clean_loc, old_clean_loc, old_clean_ver; flusher_meta_write_t meta_old, meta_new; bool clean_init_bitmap; uint64_t clean_bitmap_offset, clean_bitmap_len; - uint8_t *clean_init_data_csum; - void *new_clean_bitmap; + uint8_t *clean_init_dyn_ptr; + uint8_t *new_clean_bitmap; uint64_t new_trim_pos; - // local: scan_dirty() - uint64_t offset, end_offset, submit_offset, submit_len; - friend class journal_flusher_t; - bool scan_dirty(int wait_base); + void scan_dirty(); + bool read_dirty(int wait_base); + bool modify_meta_do_reads(int wait_base); + bool wait_meta_reads(int wait_base); bool modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base); + bool clear_incomplete_csum_block_bits(int wait_base); + void calc_block_checksums(uint32_t *new_data_csums, bool skip_overwrites); + void update_metadata_entry(); + bool write_meta_block(flusher_meta_write_t & meta_block, int wait_base); void update_clean_db(); + void free_data_blocks(); bool fsync_batch(bool fsync_meta, int wait_base); + bool trim_journal(int wait_base); + void free_buffers(); public: journal_flusher_co(); bool loop(); @@ -97,9 +115,10 @@ class journal_flusher_t std::map meta_sectors; std::deque flush_queue; - std::map flush_versions; + std::map flush_versions; // FIXME: consider unordered_map? bool try_find_older(std::map::iterator & dirty_end, obj_ver_id & cur); + bool try_find_other(std::map::iterator & dirty_end, obj_ver_id & cur); public: journal_flusher_t(blockstore_impl_t *bs); diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index 6f966294..381bdefa 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -177,15 +177,23 @@ struct __attribute__((__packed__)) dirty_entry // Suspend operation until there is some free space on the data device #define WAIT_FREE 5 -struct fulfill_read_t +struct used_clean_obj_t { - uint64_t offset, len; - uint64_t journal_sector; // sector+1 if used and !journal.inmemory, otherwise 0 - uint32_t item_state; - uint64_t disk_offset; - void *csum; + int refs; + uint64_t freed_block; // block+1 if freed, otherwise 0 + uint8_t *meta; // metadata copy }; +// https://github.com/algorithm-ninja/cpp-btree +// https://github.com/greg7mdp/sparsepp/ was used previously, but it was TERRIBLY slow after resizing +// with sparsepp, random reads dropped to ~700 iops very fast with just as much as ~32k objects in the DB +typedef btree::btree_map blockstore_clean_db_t; +typedef std::map blockstore_dirty_db_t; + +#include "blockstore_init.h" + +#include "blockstore_flush.h" + #define PRIV(op) ((blockstore_op_private_t*)(op)->private_data) #define FINISH_OP(op) PRIV(op)->~blockstore_op_private_t(); std::function(op->callback)(op) @@ -198,7 +206,8 @@ struct blockstore_op_private_t int op_state; // Read - std::vector read_vec; + uint64_t clean_version_used; + std::vector read_vec; // Sync, write int min_flushed_journal_sector, max_flushed_journal_sector; @@ -214,16 +223,6 @@ struct blockstore_op_private_t int sync_small_checked, sync_big_checked; }; -// https://github.com/algorithm-ninja/cpp-btree -// https://github.com/greg7mdp/sparsepp/ was used previously, but it was TERRIBLY slow after resizing -// with sparsepp, random reads dropped to ~700 iops very fast with just as much as ~32k objects in the DB -typedef btree::btree_map blockstore_clean_db_t; -typedef std::map blockstore_dirty_db_t; - -#include "blockstore_init.h" - -#include "blockstore_flush.h" - typedef uint32_t pool_id_t; typedef uint64_t pool_pg_id_t; @@ -285,6 +284,9 @@ class blockstore_impl_t int big_to_flush = 0; int write_iodepth = 0; + // clean data blocks referenced by read operations + std::map used_clean_objects; + bool live = false, queue_stall = false; ring_loop_t *ringloop; timerfd_manager_t *tfd; @@ -327,8 +329,21 @@ class blockstore_impl_t // Read int dequeue_read(blockstore_op_t *read_op); - int fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end, + void find_holes(std::vector & read_vec, uint32_t item_start, uint32_t item_end, + std::function callback); + int fulfill_read(blockstore_op_t *read_op, uint64_t & fulfilled, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location, uint64_t journal_sector, uint8_t *csum); + int fill_partial_checksum_blocks(std::vector & rv, uint64_t & fulfilled, + uint8_t *clean_entry_bitmap, uint8_t *read_buf, uint64_t read_offset, uint64_t read_end); + bool read_range_fulfilled(std::vector & rv, uint64_t & fulfilled, uint8_t *read_buf, + uint8_t *clean_entry_bitmap, uint32_t item_start, uint32_t item_end); + bool read_clean_checksum_block(blockstore_op_t *op, int rv_pos, + uint64_t &fulfilled, uint64_t clean_loc, uint32_t item_start, uint32_t item_end); + bool verify_padded_checksums(uint8_t *clean_entry_bitmap, uint32_t offset, + iovec *iov, int n_iov, std::function bad_block_cb); + bool verify_journal_checksums(uint8_t *csums, uint32_t offset, + iovec *iov, int n_iov, std::function bad_block_cb); + bool verify_read_padded_checksums(blockstore_op_t *op, uint64_t clean_loc, iovec *iov, int n_iov); int fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len, uint32_t item_state, uint64_t item_version); void handle_read_event(ring_data_t *data, blockstore_op_t *op); diff --git a/src/blockstore_init.cpp b/src/blockstore_init.cpp index 45acac09..5eb81b75 100644 --- a/src/blockstore_init.cpp +++ b/src/blockstore_init.cpp @@ -796,24 +796,32 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u else { uint32_t *block_csums = (uint32_t*)((uint8_t*)je + sizeof(journal_entry_small_write) + bs->dsk.clean_entry_bitmap_size); - uint32_t block_crc32 = 0; + uint32_t start = je->small_write.offset / bs->dsk.csum_block_size; + uint32_t end = (je->small_write.offset+je->small_write.len-1) / bs->dsk.csum_block_size; int sd_num = 0; size_t sd_pos = 0; - for (uint64_t pos = 0; pos < je->small_write.len; pos += bs->dsk.csum_block_size, block_csums++) + for (uint32_t pos = start; pos <= end; pos++, block_csums++) { - size_t block_left = bs->dsk.csum_block_size; - block_crc32 = 0; + size_t block_left = (pos == start + ? (start == end + ? je->small_write.len + : bs->dsk.csum_block_size - je->small_write.offset%bs->dsk.csum_block_size) + : (pos < end + ? bs->dsk.csum_block_size + : (je->small_write.offset + je->small_write.len)%bs->dsk.csum_block_size)); + uint32_t block_crc32 = 0; while (block_left > 0) { + assert(sd_num < small_write_data.size()); if (small_write_data[sd_num].iov_len >= sd_pos+block_left) { - block_crc32 = crc32c(block_crc32, small_write_data[sd_num].iov_base+sd_pos, block_left); + block_crc32 = crc32c(block_crc32, (uint8_t*)small_write_data[sd_num].iov_base+sd_pos, block_left); sd_pos += block_left; break; } else { - block_crc32 = crc32c(block_crc32, small_write_data[sd_num].iov_base+sd_pos, small_write_data[sd_num].iov_len-sd_pos); + block_crc32 = crc32c(block_crc32, (uint8_t*)small_write_data[sd_num].iov_base+sd_pos, small_write_data[sd_num].iov_len-sd_pos); block_left -= (small_write_data[sd_num].iov_len-sd_pos); sd_pos = 0; sd_num++; @@ -821,8 +829,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u } if (block_crc32 != *block_csums) { - printf("Journal entry data is corrupt (block %lu crc32 %x != %x)\n", - pos / bs->dsk.csum_block_size, block_crc32, *block_csums); + printf("Journal entry data is corrupt (block %u crc32 %x != %x)\n", + pos, block_crc32, *block_csums); data_csum_valid = false; break; } @@ -848,7 +856,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u .oid = je->small_write.oid, .version = je->small_write.version, }; - uint64_t dyn_size = bs->dsk.dirty_dyn_size(je->small_write.len); + uint64_t dyn_size = bs->dsk.dirty_dyn_size(je->small_write.offset, je->small_write.len); void *dyn = NULL; void *dyn_from = (uint8_t*)je + sizeof(journal_entry_small_write); if (dyn_size <= sizeof(void*)) @@ -937,7 +945,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u .oid = je->big_write.oid, .version = je->big_write.version, }; - uint64_t dyn_size = bs->dsk.dirty_dyn_size(je->big_write.len); + uint64_t dyn_size = bs->dsk.dirty_dyn_size(je->big_write.offset, je->big_write.len); void *dyn = NULL; void *dyn_from = (uint8_t*)je + sizeof(journal_entry_big_write); if (dyn_size <= sizeof(void*)) diff --git a/src/blockstore_journal.cpp b/src/blockstore_journal.cpp index bbb60cdf..1f3ff7d4 100644 --- a/src/blockstore_journal.cpp +++ b/src/blockstore_journal.cpp @@ -17,6 +17,7 @@ blockstore_journal_check_t::blockstore_journal_check_t(blockstore_impl_t *bs) // Check if we can write entries of bytes and data bytes after them to the journal int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries_required, int size, int data_after) { + uint64_t prev_next = next_sector; int required = entries_required; while (1) { @@ -35,11 +36,19 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries } required -= fits; next_in_pos += fits * size; - sectors_to_write++; + if (next_sector != prev_next || !sectors_to_write) + { + // Except the previous call to this function + sectors_to_write++; + } } else if (bs->journal.sector_info[next_sector].dirty) { - sectors_to_write++; + if (next_sector != prev_next || !sectors_to_write) + { + // Except the previous call to this function + sectors_to_write++; + } } if (required <= 0) { @@ -289,3 +298,31 @@ void journal_t::dump_diagnostics() journal_used_it == used_sectors.end() ? 0 : journal_used_it->second ); } + +static uint64_t zero_page[4096]; + +uint32_t crc32c_pad(uint32_t prev_crc, const void *buf, size_t len, size_t left_pad, size_t right_pad) +{ + uint32_t r = prev_crc; + while (left_pad >= 4096) + { + r = crc32c(r, zero_page, 4096); + left_pad -= 4096; + } + if (left_pad > 0) + r = crc32c(r, zero_page, left_pad); + r = crc32c(r, buf, len); + while (right_pad >= 4096) + { + r = crc32c(r, zero_page, 4096); + right_pad -= 4096; + } + if (left_pad > 0) + r = crc32c(r, zero_page, right_pad); + return r; +} + +uint32_t crc32c_nopad(uint32_t prev_crc, const void *buf, size_t len, size_t left_pad, size_t right_pad) +{ + return crc32c(0, buf, len); +} diff --git a/src/blockstore_journal.h b/src/blockstore_journal.h index 4388bb7c..dcfffacb 100644 --- a/src/blockstore_journal.h +++ b/src/blockstore_journal.h @@ -227,3 +227,6 @@ struct blockstore_journal_check_t }; journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size); + +uint32_t crc32c_pad(uint32_t prev_crc, const void *buf, size_t len, size_t left_pad, size_t right_pad); +uint32_t crc32c_nopad(uint32_t prev_crc, const void *buf, size_t len, size_t left_pad, size_t right_pad); diff --git a/src/blockstore_read.cpp b/src/blockstore_read.cpp index cf52b061..00765de6 100644 --- a/src/blockstore_read.cpp +++ b/src/blockstore_read.cpp @@ -1,6 +1,7 @@ // Copyright (c) Vitaliy Filippov, 2019+ // License: VNPL-1.1 (see README.md for details) +#include #include "blockstore_impl.h" int blockstore_impl_t::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len, @@ -40,64 +41,98 @@ int blockstore_impl_t::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_ return 1; } -// FIXME I've seen a bug here so I want some tests +void blockstore_impl_t::find_holes(std::vector & read_vec, + uint32_t item_start, uint32_t item_end, + std::function callback) +{ + auto cur_start = item_start; + auto alloc_start = item_start; + int i = 0; + while (1) + { + // COPY_BUF_CSUM_FILL items are fake items inserted in the end, their offsets aren't in order + for (; i < read_vec.size() && !(read_vec[i].copy_flags & COPY_BUF_CSUM_FILL); i++) + { + if (read_vec[i].offset >= cur_start) + break; + else if (read_vec[i].offset + read_vec[i].len > cur_start) + { + // Allocated: cur_start .. read_vec[i].offset + read_vec[i].len + cur_start = read_vec[i].offset + read_vec[i].len; + if (cur_start >= item_end) + goto endwhile; + } + } + if (i < read_vec.size() && !(read_vec[i].copy_flags & COPY_BUF_CSUM_FILL) && read_vec[i].offset == cur_start) + { + // Allocated - don't move alloc_start + } + else + { + // Hole + uint32_t cur_end = (i == read_vec.size() || (read_vec[i].copy_flags & COPY_BUF_CSUM_FILL) || read_vec[i].offset >= item_end + ? item_end : read_vec[i].offset); + if (alloc_start < cur_start) + i += callback(i, true, alloc_start, cur_start); + i += callback(i, false, cur_start, cur_end); + alloc_start = cur_end; + } + if (i >= read_vec.size() || (read_vec[i].copy_flags & COPY_BUF_CSUM_FILL)) + break; + cur_start = read_vec[i].offset + read_vec[i].len; + if (cur_start >= item_end) + break; + } +endwhile: + if (alloc_start < cur_start) + i += callback(i, true, alloc_start, cur_start); +} + int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location, uint64_t journal_sector, uint8_t *csum) { + int r = 1; uint32_t cur_start = item_start; if (cur_start < read_op->offset + read_op->len && item_end > read_op->offset) { cur_start = cur_start < read_op->offset ? read_op->offset : cur_start; item_end = item_end > read_op->offset + read_op->len ? read_op->offset + read_op->len : item_end; - auto it = PRIV(read_op)->read_vec.begin(); - while (1) + find_holes(PRIV(read_op)->read_vec, cur_start, item_end, [&](int pos, bool alloc, uint32_t start, uint32_t end) { - for (; it != PRIV(read_op)->read_vec.end(); it++) + if (alloc) + return 0; + copy_buffer_t el = { + .copy_flags = (IS_JOURNAL(item_state) ? COPY_BUF_JOURNAL : COPY_BUF_DATA), + .offset = start, + .len = end-start, + .disk_offset = item_location + el.offset - item_start, + .journal_sector = journal_sector, + .csum_buf = !csum ? NULL : (csum + (cur_start - item_start) / dsk.csum_block_size * (dsk.data_csum_type & 0xFF)), + }; + if (IS_BIG_WRITE(item_state)) { - if (it->offset >= cur_start) - { - break; - } - else if (it->offset + it->len > cur_start) - { - cur_start = it->offset + it->len; - if (cur_start >= item_end) - { - goto endwhile; - } - } + // If we don't track it then we may IN THEORY read another object's data: + // submit read -> remove the object -> flush remove -> overwrite with another object -> finish read + // Very improbable, but possible + PRIV(read_op)->clean_version_used = 1; } - if (it == PRIV(read_op)->read_vec.end() || it->offset > cur_start) + PRIV(read_op)->read_vec.insert(PRIV(read_op)->read_vec.begin() + pos, el); + if (!fulfill_read_push(read_op, + (uint8_t*)read_op->buf + el.offset - read_op->offset, + item_location + el.offset - item_start, + el.len, item_state, item_version)) { - fulfill_read_t el = { - .offset = cur_start, - .len = it == PRIV(read_op)->read_vec.end() || it->offset >= item_end ? item_end-cur_start : it->offset-cur_start, - .journal_sector = journal_sector, - .item_state = item_state, - .disk_offset = item_location + el.offset - item_start, - .csum = !csum ? NULL : (csum + (cur_start - item_start) / dsk.csum_block_size * (dsk.data_csum_type & 0xFF)), - }; - it = PRIV(read_op)->read_vec.insert(it, el); - if (!fulfill_read_push(read_op, - (uint8_t*)read_op->buf + el.offset - read_op->offset, - item_location + el.offset - item_start, - el.len, item_state, item_version)) - { - return 0; - } - fulfilled += el.len; + PRIV(read_op)->read_vec.clear(); + r = 0; + return 0; } - cur_start = it->offset + it->len; - if (it == PRIV(read_op)->read_vec.end() || cur_start >= item_end) - { - break; - } - } + fulfilled += el.len; + return 1; + }); } -endwhile: - return 1; + return r; } uint8_t* blockstore_impl_t::get_clean_entry_bitmap(uint64_t block_loc, int offset) @@ -115,6 +150,175 @@ uint8_t* blockstore_impl_t::get_clean_entry_bitmap(uint64_t block_loc, int offse return clean_entry_bitmap; } +int blockstore_impl_t::fill_partial_checksum_blocks(std::vector & rv, uint64_t & fulfilled, + uint8_t *clean_entry_bitmap, uint8_t *read_buf, uint64_t read_offset, uint64_t read_end) +{ + if (read_end == read_offset) + return 0; + int required = 0; + read_buf -= read_offset; + uint32_t last_block = (read_end-1)/dsk.csum_block_size; + uint32_t start_block = read_offset/dsk.csum_block_size; + uint32_t end_block = 0; + while (start_block <= last_block) + { + if (read_range_fulfilled(rv, fulfilled, read_buf, clean_entry_bitmap, + start_block*dsk.csum_block_size < read_offset ? read_offset : start_block*dsk.csum_block_size, + (start_block+1)*dsk.csum_block_size > read_end ? read_end : (start_block+1)*dsk.csum_block_size)) + { + // read_range_fulfilled() also adds zero-filled areas + start_block++; + } + else + { + // Find a sequence of checksum blocks required to be read + end_block = start_block; + while ((end_block+1)*dsk.csum_block_size < read_end && + !read_range_fulfilled(rv, fulfilled, read_buf, clean_entry_bitmap, + (end_block+1)*dsk.csum_block_size < read_offset ? read_offset : (end_block+1)*dsk.csum_block_size, + (end_block+2)*dsk.csum_block_size > read_end ? read_end : (end_block+2)*dsk.csum_block_size)) + { + end_block++; + } + end_block++; + // OK, mark this range as required + rv.push_back((copy_buffer_t){ + .copy_flags = COPY_BUF_CSUM_FILL, + .offset = start_block*dsk.csum_block_size, + .len = (end_block-start_block)*dsk.csum_block_size, + }); + start_block = end_block; + required++; + } + } + return required; +} + +// read_buf should be == op->buf - op->offset +bool blockstore_impl_t::read_range_fulfilled(std::vector & rv, uint64_t & fulfilled, uint8_t *read_buf, + uint8_t *clean_entry_bitmap, uint32_t item_start, uint32_t item_end) +{ + bool all_done = true; + find_holes(rv, item_start, item_end, [&](int pos, bool alloc, uint32_t cur_start, uint32_t cur_end) + { + if (alloc) + return 0; + int diff = 0; + uint32_t bmp_start = cur_start/dsk.bitmap_granularity; + uint32_t bmp_end = cur_end/dsk.bitmap_granularity; + uint32_t bmp_pos = bmp_start; + while (bmp_pos < bmp_end) + { + while (bmp_pos < bmp_end && !(clean_entry_bitmap[bmp_pos >> 3] & (1 << (bmp_pos & 0x7)))) + bmp_pos++; + if (bmp_pos > bmp_start) + { + // zero fill + copy_buffer_t el = { + .copy_flags = COPY_BUF_ZERO, + .offset = bmp_start*dsk.bitmap_granularity, + .len = (bmp_pos-bmp_start)*dsk.bitmap_granularity, + }; + rv.insert(rv.begin() + pos, el); + if (read_buf) + memset(read_buf + el.offset, 0, el.len); + fulfilled += el.len; + diff++; + } + bmp_start = bmp_pos; + while (bmp_pos < bmp_end && (clean_entry_bitmap[bmp_pos >> 3] & (1 << (bmp_pos & 0x7)))) + bmp_pos++; + if (bmp_pos > bmp_start) + { + // something is to be read + all_done = false; + } + bmp_start = bmp_pos; + } + return diff; + }); + return all_done; +} + +bool blockstore_impl_t::read_clean_checksum_block(blockstore_op_t *op, int rv_pos, + uint64_t &fulfilled, uint64_t clean_loc, uint32_t item_start, uint32_t item_end) +{ + auto & rv = PRIV(op)->read_vec; + uint32_t fill_size = 0; + int n_iov = 0; + find_holes(rv, item_start, item_end, [&](int pos, bool alloc, uint32_t cur_start, uint32_t cur_end) + { + if (alloc) + fill_size += cur_end-cur_start; + n_iov++; + return 0; + }); + void *buf = memalign_or_die(MEM_ALIGNMENT, fill_size + n_iov*sizeof(struct iovec)); + iovec *iov = (struct iovec*)((uint8_t*)buf+fill_size); + n_iov = 0; + fill_size = 0; + find_holes(rv, item_start, item_end, [&](int pos, bool alloc, uint32_t cur_start, uint32_t cur_end) + { + if (alloc) + { + iov[n_iov++] = (struct iovec){ (uint8_t*)buf+fill_size, cur_end-cur_start }; + fill_size += cur_end-cur_start; + } + else + { + iov[n_iov++] = (struct iovec){ (uint8_t*)op->buf+cur_start-op->offset, cur_end-cur_start }; + rv.insert(rv.begin() + pos, (copy_buffer_t){ + .copy_flags = COPY_BUF_DATA, + .offset = cur_start, + .len = cur_end-cur_start, + .disk_offset = clean_loc, + }); + fulfilled += cur_end-cur_start; + return 1; + } + return 0; + }); + // Save buf into read_vec too but in a creepy way + // FIXME: Shit, something else should be invented %) + rv[rv.size()-rv_pos] = (copy_buffer_t){ + .copy_flags = COPY_BUF_CSUM_FILL, + .offset = 0xffffffff, + .len = ((uint64_t)n_iov << 32) | fill_size, + .disk_offset = clean_loc + item_start, + .csum_buf = (uint8_t*)buf, + }; + uint32_t d_pos = 0; + for (int n_pos = 0; n_pos < n_iov; n_pos += IOV_MAX) + { + int n_cur = n_iov-n_pos < IOV_MAX ? n_iov-n_pos : IOV_MAX; + BS_SUBMIT_GET_SQE(sqe, data); + PRIV(op)->pending_ops++; + my_uring_prep_readv( + sqe, dsk.data_fd, iov + n_pos, n_cur, dsk.data_offset + clean_loc + d_pos + ); + data->callback = [this, op](ring_data_t *data) { handle_read_event(data, op); }; + if (n_pos > 0 || n_pos + IOV_MAX < n_iov) + { + uint32_t d_len = 0; + for (int i = 0; i < IOV_MAX; i++) + d_len += iov[n_pos+i].iov_len; + data->iov.iov_len = d_len; + d_pos += d_len; + } + else + data->iov.iov_len = item_end-item_start; + } + // Reading may race with flushing. + // - Flushing happens in 3 steps: (2) punch holes in meta -> (4) update data -> (6) update meta + // - Reading may start/end at: 1/3, 1/5, 1/7, 3/5, 3/7, 5/7 + // - 1/3, 1/5, 3/5 are not a problem because we'll check data using punched bitmap and CRCs + // - For 1/7, 3/7 and 5/7 to finish correctly we need a copy of punched metadata + // otherwise the checksum may not match + // So flushers save a copy of punched metadata if the object is being read during (6). + PRIV(op)->clean_version_used = 1; + return true; +} + int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) { auto & clean_db = clean_db_shard(read_op->oid); @@ -136,6 +340,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) } uint64_t fulfilled = 0; PRIV(read_op)->pending_ops = 0; + PRIV(read_op)->clean_version_used = 0; uint64_t result_version = 0; if (dirty_found) { @@ -153,7 +358,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) FINISH_OP(read_op); return 2; } - size_t dyn_size = dsk.dirty_dyn_size(dirty.len); + size_t dyn_size = dsk.dirty_dyn_size(dirty.offset, dirty.len); uint8_t *bmp_ptr = (uint8_t*)(dyn_size > sizeof(void*) ? dirty.dyn_data : &dirty.dyn_data); if (!result_version) { @@ -164,6 +369,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) } } // If inmemory_journal is false, journal trim will have to wait until the read is completed + // FIXME: Verify checksums when reading from journal disk if (!fulfill_read(read_op, fulfilled, dirty.offset, dirty.offset + dirty.len, dirty.state, dirty_it->first.version, dirty.location + (IS_JOURNAL(dirty.state) ? 0 : dirty.offset), (IS_JOURNAL(dirty.state) ? dirty.journal_sector+1 : 0), @@ -206,6 +412,30 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) return 0; } } + else if (dsk.csum_block_size > dsk.bitmap_granularity) + { + auto & rv = PRIV(read_op)->read_vec; + int req = fill_partial_checksum_blocks(rv, fulfilled, clean_entry_bitmap, + (uint8_t*)read_op->buf, read_op->offset, read_op->offset+read_op->len); + for (int i = req; i > 0; i--) + { + auto & vi = rv[rv.size()-i]; + if (!read_clean_checksum_block(read_op, i, fulfilled, clean_it->second.location, vi.offset, vi.offset+vi.len)) + { + // need to wait. undo added requests, don't dequeue op + for (auto & vec: rv) + { + if (vec.copy_flags == COPY_BUF_CSUM_FILL && vec.csum_buf) + { + free(vec.csum_buf); + vec.csum_buf = NULL; + } + } + rv.clear(); + return 0; + } + } + } else { uint64_t bmp_start = 0, bmp_end = 0, bmp_size = dsk.data_block_size/dsk.bitmap_granularity; @@ -243,6 +473,13 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) } } } + // Increment counter if clean data is being read from the disk + if (PRIV(read_op)->clean_version_used) + { + obj_ver_id ov = { .oid = read_op->oid, .version = clean_it->second.version }; + used_clean_objects[ov].refs++; + PRIV(read_op)->clean_version_used = ov.version; + } } } if (!result_version) @@ -284,6 +521,114 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) return 2; } +bool blockstore_impl_t::verify_padded_checksums(uint8_t *clean_entry_bitmap, uint32_t offset, + iovec *iov, int n_iov, std::function bad_block_cb) +{ + assert(!(offset % dsk.csum_block_size)); + uint32_t *csums = (uint32_t*)(clean_entry_bitmap + 2*dsk.clean_entry_bitmap_size); + uint32_t block_csum = 0; + uint32_t block_done = 0; + uint32_t block_num = clean_entry_bitmap ? offset/dsk.csum_block_size : 0; + uint32_t bmp_pos = offset/dsk.bitmap_granularity; + for (int i = 0; i < n_iov; i++) + { + uint32_t pos = 0; + while (pos < iov[i].iov_len) + { + uint32_t start = pos; + uint8_t bit = (clean_entry_bitmap[bmp_pos >> 3] >> (bmp_pos & 0x7)) & 1; + while (pos < iov[i].iov_len && ((clean_entry_bitmap[bmp_pos >> 3] >> (bmp_pos & 0x7)) & 1) == bit) + { + pos += dsk.bitmap_granularity; + bmp_pos++; + } + uint32_t len = pos-start; + auto buf = (uint8_t*)iov[i].iov_base+start; + while (block_done+len >= dsk.csum_block_size) + { + auto cur_len = dsk.csum_block_size-block_done; + block_csum = crc32c_pad(block_csum, buf, bit ? cur_len : 0, bit ? 0 : cur_len, 0); + if (block_csum != csums[block_num]) + { + if (bad_block_cb) + bad_block_cb(block_num*dsk.csum_block_size, block_csum, csums[block_num]); + else + return false; + } + block_num++; + buf += cur_len; + len -= cur_len; + block_done = block_csum = 0; + } + if (len > 0) + { + block_csum = crc32c_pad(block_csum, buf, bit ? len : 0, bit ? 0 : len, 0); + block_done += len; + } + } + } + assert(!block_done); + return true; +} + +bool blockstore_impl_t::verify_journal_checksums(uint8_t *csums, uint32_t offset, + iovec *iov, int n_iov, std::function bad_block_cb) +{ + uint32_t block_csum = 0; + uint32_t block_num = 0; + uint32_t block_done = offset%dsk.csum_block_size; + for (int i = 0; i < n_iov; i++) + { + uint32_t len = iov[i].iov_len; + auto buf = (uint8_t*)iov[i].iov_base; + while (block_done+len >= dsk.csum_block_size) + { + auto cur_len = dsk.csum_block_size-block_done; + block_csum = crc32c(block_csum, buf, cur_len); + if (block_csum != ((uint32_t*)csums)[block_num]) + { + if (bad_block_cb) + bad_block_cb(block_num*dsk.csum_block_size - (offset%dsk.csum_block_size), block_csum, ((uint32_t*)csums)[block_num]); + else + return false; + } + block_num++; + buf += cur_len; + len -= cur_len; + block_done = block_csum = 0; + } + if (len > 0) + { + block_csum = crc32c(block_csum, buf, len); + block_done += len; + } + } + if (block_done > 0 && block_csum != ((uint32_t*)csums)[block_num]) + { + if (bad_block_cb) + bad_block_cb(block_num*dsk.csum_block_size - (offset%dsk.csum_block_size), block_csum, ((uint32_t*)csums)[block_num]); + else + return false; + } + return true; +} + +bool blockstore_impl_t::verify_read_padded_checksums(blockstore_op_t *op, uint64_t clean_loc, iovec *iov, int n_iov) +{ + uint32_t offset = clean_loc % dsk.data_block_size; + clean_loc = (clean_loc >> dsk.block_order) << dsk.block_order; + // First verify against the newest checksum version + uint8_t *clean_entry_bitmap = get_clean_entry_bitmap(clean_loc, 0); + if (verify_padded_checksums(clean_entry_bitmap, offset, iov, n_iov, NULL)) + return true; + // Check through all relevant "metadata backups" possibly added by flushers + auto mb_it = used_clean_objects.lower_bound((obj_ver_id){ .oid = op->oid, .version = PRIV(op)->clean_version_used }); + for (; mb_it != used_clean_objects.end() && mb_it->first.oid == op->oid; mb_it++) + if (mb_it->second.meta != NULL && verify_padded_checksums(mb_it->second.meta, offset, iov, n_iov, NULL)) + return true; + return false; +} + void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op) { live = true; @@ -293,43 +638,97 @@ void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op // read error op->retval = data->res; } - else if (dsk.csum_block_size) + if (PRIV(op)->pending_ops == 0) { - // verify checksum if required - uint64_t el_offset = (uint8_t*)data->iov.iov_base - ((uint8_t*)op->buf - op->offset); - auto & vecs = PRIV(op)->read_vec; - int el_min = 0, el_max = vecs.size(); - while (el_max > el_min+1) + if (dsk.csum_block_size) { - int mid = (el_min+el_max)/2; - if (el_offset < vecs[mid].offset) - el_max = mid; - else - el_min = mid; - } - auto & vec = vecs[el_min]; - assert(vec.offset == el_offset); - if (vec.csum) - { - uint32_t *csum = (uint32_t*)vec.csum; - for (size_t p = 0; p < data->iov.iov_len; p += dsk.csum_block_size, csum++) + // verify checksums if required + auto & rv = PRIV(op)->read_vec; + if (dsk.csum_block_size > dsk.bitmap_granularity) { - if (crc32c(0, data->iov.iov_base + p, dsk.csum_block_size) != *csum) + for (int i = rv.size()-1; i >= 0 && rv[i].copy_flags == COPY_BUF_CSUM_FILL; i--) { - // checksum error - printf( - "Checksum mismatch in %s area at offset 0x%lx: %08lx vs %08lx\n", - IS_JOURNAL(vec.item_state) ? "journal" : "data", - crc32c(0, data->iov.iov_base + p, dsk.csum_block_size), *csum - ); - op->retval = -EDOM; - break; + struct iovec *iov = (struct iovec*)(rv[i].csum_buf + (rv[i].len & 0xFFFFFFFF)); + if (!verify_read_padded_checksums(op, rv[i].disk_offset, iov, rv[i].len >> 32)) + op->retval = -EDOM; + free(rv[i].csum_buf); + rv[i].csum_buf = NULL; + } + } + else + { + for (auto & vec: rv) + { + if (vec.csum_buf) + { + uint32_t *csum = (uint32_t*)vec.csum_buf; + for (size_t p = 0; p < data->iov.iov_len; p += dsk.csum_block_size, csum++) + { + if (crc32c(0, (uint8_t*)data->iov.iov_base + p, dsk.csum_block_size) != *csum) + { + // checksum error + printf( + "Checksum mismatch in object %lx:%lx v%lu in %s area at offset 0x%lx: %08x vs %08x\n", + op->oid.inode, op->oid.stripe, op->version, + (vec.copy_flags & COPY_BUF_JOURNAL) ? "journal" : "data", vec.disk_offset, + crc32c(0, (uint8_t*)data->iov.iov_base + p, dsk.csum_block_size), *csum + ); + op->retval = -EDOM; + break; + } + } + } + } + } + } + if (PRIV(op)->clean_version_used) + { + // Release clean data block + obj_ver_id ov = { .oid = op->oid, .version = PRIV(op)->clean_version_used }; + auto uo_it = used_clean_objects.find(ov); + if (uo_it != used_clean_objects.end()) + { + uo_it->second.refs--; + if (uo_it->second.refs <= 0) + { + // Check to the left - even older usage entries may exist + bool still_used = false; + while (uo_it != used_clean_objects.begin()) + { + uo_it--; + if (uo_it->first.oid != op->oid) + { + uo_it++; + break; + } + if (uo_it->second.refs > 0) + { + still_used = true; + break; + } + } + // Free uo_it AND all following records with refs==0 too + if (!still_used) + { + while (uo_it != used_clean_objects.end() && + uo_it->first.oid == op->oid && + uo_it->second.refs == 0) + { + if (uo_it->second.freed_block > 0) + { + data_alloc->set(uo_it->second.freed_block-1, false); + } + if (uo_it->second.meta) + { + free(uo_it->second.meta); + uo_it->second.meta = NULL; + } + used_clean_objects.erase(uo_it++); + } + } } } } - } - if (PRIV(op)->pending_ops == 0) - { if (!journal.inmemory) { // Release journal sector usage @@ -370,7 +769,7 @@ int blockstore_impl_t::read_bitmap(object_id oid, uint64_t target_version, void *result_version = dirty_it->first.version; if (bitmap) { - size_t dyn_size = dsk.dirty_dyn_size(dirty_it->second.len); + size_t dyn_size = dsk.dirty_dyn_size(dirty_it->second.offset, dirty_it->second.len); void *dyn_ptr = (dyn_size > sizeof(void*) ? dirty_it->second.dyn_data : &dirty_it->second.dyn_data); memcpy(bitmap, dyn_ptr, dsk.clean_entry_bitmap_size); } diff --git a/src/blockstore_rollback.cpp b/src/blockstore_rollback.cpp index ed0e0671..59fb3343 100644 --- a/src/blockstore_rollback.cpp +++ b/src/blockstore_rollback.cpp @@ -241,10 +241,10 @@ void blockstore_impl_t::free_dirty_dyn_data(dirty_entry & e) { if (e.dyn_data) { - size_t dyn_size = dsk.dirty_dyn_size(e.len); + size_t dyn_size = dsk.dirty_dyn_size(e.offset, e.len); if (dyn_size > sizeof(void*) && (!journal.inmemory || e.dyn_data < journal.buffer || - e.dyn_data >= journal.buffer + journal.len)) + e.dyn_data >= (uint8_t*)journal.buffer + journal.len)) { // dyn_data contains the bitmap and checksums // free it if it doesn't refer to the in-memory journal diff --git a/src/blockstore_sync.cpp b/src/blockstore_sync.cpp index 58aa9766..6fb86f51 100644 --- a/src/blockstore_sync.cpp +++ b/src/blockstore_sync.cpp @@ -78,7 +78,23 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) // 2nd step: Data device is synced, prepare & write journal entries // Check space in the journal and journal memory buffers blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(), + if (dsk.csum_block_size) + { + // More complex check because all journal entries have different lengths + int left = PRIV(op)->sync_big_writes.size(); + for (auto & sbw: PRIV(op)->sync_big_writes) + { + left--; + auto & dirty_entry = dirty_db.at(sbw); + uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.offset, dirty_entry.len); + if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size, + left == 0 ? JOURNAL_STABILIZE_RESERVATION : 0)) + { + return 0; + } + } + } + else if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(), sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION)) { return 0; @@ -90,16 +106,17 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) int s = 0; while (it != PRIV(op)->sync_big_writes.end()) { - if (!journal.entry_fits(sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size) && + auto & dirty_entry = dirty_db.at(*it); + uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.offset, dirty_entry.len); + if (!journal.entry_fits(sizeof(journal_entry_big_write) + dyn_size) && journal.sector_info[journal.cur_sector].dirty) { prepare_journal_sector_write(journal.cur_sector, op); s++; } - auto & dirty_entry = dirty_db.at(*it); journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( journal, (dirty_entry.state & BS_ST_INSTANT) ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE, - sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size + sizeof(journal_entry_big_write) + dyn_size ); dirty_entry.journal_sector = journal.sector_info[journal.cur_sector].offset; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; @@ -115,7 +132,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) je->offset = dirty_entry.offset; je->len = dirty_entry.len; je->location = dirty_entry.location; - uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.len); memcpy((void*)(je+1), (dyn_size > sizeof(void*) ? dirty_entry.dyn_data : &dirty_entry.dyn_data), dyn_size); je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index 8cb0ed82..2327be72 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -13,7 +13,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) { op->len = 0; } - size_t dyn_size = dsk.dirty_dyn_size(op->len); + size_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len); if (!is_del && dyn_size > sizeof(void*)) { dyn = calloc_or_die(1, dyn_size); @@ -39,7 +39,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) : ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG); if (!is_del && !deleted) { - void *dyn_from = dsk.dirty_dyn_size(dirty_it->second.len) > sizeof(void*) + void *dyn_from = dsk.dirty_dyn_size(dirty_it->second.offset, dirty_it->second.len) > sizeof(void*) ? dirty_it->second.dyn_data : &dirty_it->second.dyn_data; memcpy(dyn_ptr, dyn_from, dsk.clean_entry_bitmap_size); } @@ -163,7 +163,6 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) if (op->bitmap) { // Only allow to overwrite part of the object bitmap respective to the write's offset/len - // FIXME Don't merge bitmaps here - parallel writes into one object may break the bitmap uint32_t bit = op->offset/dsk.bitmap_granularity; uint32_t bits_left = op->len/dsk.bitmap_granularity; while (!(bit % 8) && bits_left >= 8) @@ -184,13 +183,25 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) } } // Calculate checksums - // FIXME: Allow to receive them from outside - if (dsk.data_csum_type) + // FIXME: Allow to receive checksums from outside? + if (!is_del && dsk.data_csum_type && op->len > 0) { - uint32_t *data_csum = (uint32_t*)(dyn_ptr + dsk.clean_entry_bitmap_size); - for (uint32_t i = 0; i < op->len / dsk.csum_block_size; i++) + uint32_t *data_csums = (uint32_t*)(dyn_ptr + dsk.clean_entry_bitmap_size); + uint32_t start = op->offset / dsk.csum_block_size; + uint32_t end = (op->offset+op->len-1) / dsk.csum_block_size; + auto fn = state & BS_ST_BIG_WRITE ? crc32c_pad : crc32c_nopad; + if (start == end) + data_csums[0] = fn(0, op->buf, op->len, op->offset - start*dsk.csum_block_size, end*dsk.csum_block_size - (op->offset+op->len)); + else { - data_csum[i] = crc32c(0, op->buf + i*dsk.csum_block_size, dsk.csum_block_size); + data_csums[0] = fn(0, op->buf, dsk.csum_block_size*(start+1)-op->offset, op->offset - start*dsk.csum_block_size, 0); + for (uint32_t i = start+1; i < end; i++) + data_csums[i-start] = crc32c(0, (uint8_t*)op->buf + dsk.csum_block_size*i-op->offset, dsk.csum_block_size); + data_csums[end-start] = fn( + 0, (uint8_t*)op->buf + end*dsk.csum_block_size - op->offset, + op->offset+op->len - end*dsk.csum_block_size, + 0, (end+1)*dsk.csum_block_size - (op->offset+op->len) + ); } } dirty_db.emplace((obj_ver_id){ @@ -377,12 +388,13 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) { // Small (journaled) write // First check if the journal has sufficient space + uint64_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len); blockstore_journal_check_t space_check(this); if (unsynced_big_write_count && !space_check.check_available(op, unsynced_big_write_count, - sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, 0) + sizeof(journal_entry_big_write) + dsk.dirty_dyn_size(0, dsk.data_block_size), 0) || !space_check.check_available(op, 1, - sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size, + sizeof(journal_entry_small_write) + dyn_size, op->len + ((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION))) { return 0; @@ -391,7 +403,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) BS_SUBMIT_CHECK_SQES( // Write current journal sector only if it's dirty and full, or in the immediate_commit mode (immediate_commit != IMMEDIATE_NONE || - !journal.entry_fits(sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size) ? 1 : 0) + + !journal.entry_fits(sizeof(journal_entry_small_write) + dyn_size) ? 1 : 0) + (op->len > 0 ? 1 : 0) ); write_iodepth++; @@ -399,7 +411,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; if (immediate_commit == IMMEDIATE_NONE) { - if (!journal.entry_fits(sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size)) + if (!journal.entry_fits(sizeof(journal_entry_small_write) + dyn_size)) { prepare_journal_sector_write(journal.cur_sector, op); } @@ -409,7 +421,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) } } // Then pre-fill journal entry - uint64_t dyn_size = dsk.dirty_dyn_size(op->len); journal_entry_small_write *je = (journal_entry_small_write*)prefill_single_journal_entry( journal, op->opcode == BS_OP_WRITE_STABLE ? JE_SMALL_WRITE_INSTANT : JE_SMALL_WRITE, sizeof(journal_entry_small_write) + dyn_size @@ -516,20 +527,14 @@ resume_2: .version = op->version, }); assert(dirty_it != dirty_db.end()); + uint64_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len); blockstore_journal_check_t space_check(this); - if (!space_check.check_available(op, 1, - sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, + if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size, ((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION))) { return 0; } BS_SUBMIT_CHECK_SQES(1); - auto dirty_it = dirty_db.find((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); - assert(dirty_it != dirty_db.end()); - uint64_t dyn_size = dsk.dirty_dyn_size(op->len); journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE, sizeof(journal_entry_big_write) + dyn_size diff --git a/src/disk_tool_journal.cpp b/src/disk_tool_journal.cpp index c70913a9..42c1cbda 100644 --- a/src/disk_tool_journal.cpp +++ b/src/disk_tool_journal.cpp @@ -205,7 +205,7 @@ int disk_tool_t::process_journal_block(void *buf, std::functionsize - data_csum_size); for (uint32_t pos = 0; pos < je->small_write.len; pos += je_start.csum_block_size, block_csums++) { - if (crc32c(0, small_write_data + pos, je_start.csum_block_size) != *block_csums) + if (crc32c(0, (uint8_t*)small_write_data + pos, je_start.csum_block_size) != *block_csums) { data_csum_valid = false; break; diff --git a/tests/run_3osds.sh b/tests/run_3osds.sh index 79180a13..cebfc371 100644 --- a/tests/run_3osds.sh +++ b/tests/run_3osds.sh @@ -27,7 +27,9 @@ fi start_osd() { local i=$1 - build/src/vitastor-osd --osd_num $i --bind_address 127.0.0.1 $NO_SAME $OSD_ARGS --etcd_address $ETCD_URL $(build/src/vitastor-disk simple-offsets --format options ./testdata/test_osd$i.bin $OFFSET_ARGS 2>/dev/null) >>./testdata/osd$i.log 2>&1 & + build/src/vitastor-osd --osd_num $i --bind_address 127.0.0.1 $NO_SAME $OSD_ARGS --etcd_address $ETCD_URL \ + $(build/src/vitastor-disk simple-offsets --format options ./testdata/test_osd$i.bin $OFFSET_ARGS 2>/dev/null) \ + >>./testdata/osd$i.log 2>&1 & eval OSD${i}_PID=$! } diff --git a/tests/test_heal.sh b/tests/test_heal.sh index 319e9a58..559afbcd 100755 --- a/tests/test_heal.sh +++ b/tests/test_heal.sh @@ -12,6 +12,7 @@ PG_COUNT=32 . `dirname $0`/run_3osds.sh check_qemu +# FIXME: Fix space rebalance priorities :) IMG_SIZE=960 $ETCDCTL put /vitastor/config/inode/1/1 '{"name":"testimg","size":'$((IMG_SIZE*1024*1024))'}' @@ -24,6 +25,7 @@ kill_osds() { sleep 5 + echo Killing OSD 1 kill -9 $OSD1_PID $ETCDCTL del /vitastor/osd/state/1 @@ -38,6 +40,7 @@ kill_osds() done sleep 5 + echo Starting OSD 7 start_osd 7 sleep 5