Implement large csum_block_size support (more than 4k) + refactor blockstore_flush

hotfix-1.0.0
Vitaliy Filippov 2022-11-29 01:37:45 +03:00
parent 0b0405d115
commit 7d532880c3
17 changed files with 1548 additions and 602 deletions

View File

@ -143,34 +143,83 @@ uint64_t allocator::get_free_count()
return free;
}
// FIXME: Move to utils?
void bitmap_set(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity)
{
if (start == 0)
if (start == 0 && len == 32*bitmap_granularity)
*((uint32_t*)bitmap) = UINT32_MAX;
else if (start == 0 && len == 64*bitmap_granularity)
*((uint64_t*)bitmap) = UINT64_MAX;
else
{
if (len == 32*bitmap_granularity)
unsigned bit_start = start / bitmap_granularity;
unsigned bit_end = ((start + len) + bitmap_granularity - 1) / bitmap_granularity;
while (bit_start < bit_end)
{
*((uint32_t*)bitmap) = UINT32_MAX;
return;
}
else if (len == 64*bitmap_granularity)
{
*((uint64_t*)bitmap) = UINT64_MAX;
return;
}
}
unsigned bit_start = start / bitmap_granularity;
unsigned bit_end = ((start + len) + bitmap_granularity - 1) / bitmap_granularity;
while (bit_start < bit_end)
{
if (!(bit_start & 7) && bit_end >= bit_start+8)
{
((uint8_t*)bitmap)[bit_start / 8] = UINT8_MAX;
bit_start += 8;
}
else
{
((uint8_t*)bitmap)[bit_start / 8] |= 1 << (bit_start % 8);
bit_start++;
if (!(bit_start & 7) && bit_end >= bit_start+8)
{
((uint8_t*)bitmap)[bit_start / 8] = UINT8_MAX;
bit_start += 8;
}
else
{
((uint8_t*)bitmap)[bit_start / 8] |= 1 << (bit_start % 8);
bit_start++;
}
}
}
}
void bitmap_clear(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity)
{
if (start == 0 && len == 32*bitmap_granularity)
*((uint32_t*)bitmap) = 0;
else if (start == 0 && len == 64*bitmap_granularity)
*((uint64_t*)bitmap) = 0;
else
{
unsigned bit_start = start / bitmap_granularity;
unsigned bit_end = ((start + len) + bitmap_granularity - 1) / bitmap_granularity;
while (bit_start < bit_end)
{
if (!(bit_start & 7) && bit_end >= bit_start+8)
{
((uint8_t*)bitmap)[bit_start / 8] = 0;
bit_start += 8;
}
else
{
((uint8_t*)bitmap)[bit_start / 8] &= (0xFF ^ (1 << (bit_start % 8)));
bit_start++;
}
}
}
}
bool bitmap_check(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity)
{
bool r = false;
if (start == 0 && len == 32*bitmap_granularity)
r = !!*((uint32_t*)bitmap);
else if (start == 0 && len == 64*bitmap_granularity)
r = !!*((uint64_t*)bitmap);
else
{
unsigned bit_start = start / bitmap_granularity;
unsigned bit_end = ((start + len) + bitmap_granularity - 1) / bitmap_granularity;
while (bit_start < bit_end)
{
if (!(bit_start & 7) && bit_end >= bit_start+8)
{
r = r || !!((uint8_t*)bitmap)[bit_start / 8];
bit_start += 8;
}
else
{
r = r || (((uint8_t*)bitmap)[bit_start / 8] & (1 << (bit_start % 8)));
bit_start++;
}
}
}
return r;
}

View File

@ -23,3 +23,5 @@ public:
};
void bitmap_set(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity);
void bitmap_clear(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity);
bool bitmap_check(void *bitmap, uint64_t start, uint64_t len, uint64_t bitmap_granularity);

View File

@ -122,11 +122,6 @@ void blockstore_disk_t::parse_config(std::map<std::string, std::string> & config
{
throw std::runtime_error("Checksum block size must be a divisor of data block size");
}
if (csum_block_size && csum_block_size != bitmap_granularity)
{
// FIXME: Support other checksum block sizes (with read-modify-write)
throw std::runtime_error("Checksum block sizes other than bitmap_granularity are not supported yet");
}
if (meta_device == "")
{
meta_device = data_device;
@ -144,7 +139,8 @@ void blockstore_disk_t::parse_config(std::map<std::string, std::string> & config
throw std::runtime_error("journal_offset must be a multiple of journal_block_size = "+std::to_string(journal_block_size));
}
clean_entry_bitmap_size = data_block_size / bitmap_granularity / 8;
clean_dyn_size = clean_entry_bitmap_size + dirty_dyn_size(data_block_size);
clean_dyn_size = clean_entry_bitmap_size*2 + (csum_block_size
? data_block_size/csum_block_size*(data_csum_type & 0xFF) : 0);
clean_entry_size = sizeof(clean_disk_entry) + clean_dyn_size + 4 /*entry_csum*/;
}

View File

@ -48,8 +48,12 @@ struct blockstore_disk_t
void calc_lengths(bool skip_meta_check = false);
void close_all();
inline uint64_t dirty_dyn_size(uint64_t len)
inline uint64_t dirty_dyn_size(uint64_t offset, uint64_t len)
{
return clean_entry_bitmap_size + (csum_block_size ? len/csum_block_size * (data_csum_type & 0xFF) : 0);
// Checksums may be partial if write is not aligned with csum_block_size
return clean_entry_bitmap_size + (csum_block_size
? ((offset+len+csum_block_size-1)/csum_block_size - offset/csum_block_size)
* (data_csum_type & 0xFF)
: 0);
}
};

File diff suppressed because it is too large Load Diff

View File

@ -1,10 +1,18 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#define COPY_BUF_JOURNAL 1
#define COPY_BUF_DATA 2
#define COPY_BUF_ZERO 4
#define COPY_BUF_CSUM_FILL 8
#define COPY_BUF_COALESCED 16
struct copy_buffer_t
{
uint64_t offset, len;
void *buf;
int copy_flags;
uint64_t offset, len, disk_offset;
uint64_t journal_sector; // only for reads: sector+1 if used and !journal.inmemory, otherwise 0
void *buf; // only for writes: new checksum data
uint8_t *csum_buf;
};
@ -38,7 +46,7 @@ class journal_flusher_co
{
blockstore_impl_t *bs;
journal_flusher_t *flusher;
int wait_state, wait_count;
int wait_state, wait_count, wait_journal_count;
struct io_uring_sqe *sqe;
struct ring_data_t *data;
@ -47,29 +55,39 @@ class journal_flusher_co
obj_ver_id cur;
std::map<obj_ver_id, dirty_entry>::iterator dirty_it, dirty_start, dirty_end;
std::map<object_id, uint64_t>::iterator repeat_it;
std::function<void(ring_data_t*)> simple_callback_r, simple_callback_w;
std::function<void(ring_data_t*)> simple_callback_r, simple_callback_rj, simple_callback_w;
bool skip_copy, has_delete, has_writes;
std::vector<copy_buffer_t> v;
std::vector<copy_buffer_t>::iterator it;
int i;
bool fill_incomplete, cleared_incomplete;
int read_to_fill_incomplete;
int copy_count;
uint64_t clean_loc, old_clean_loc;
uint64_t clean_loc, old_clean_loc, old_clean_ver;
flusher_meta_write_t meta_old, meta_new;
bool clean_init_bitmap;
uint64_t clean_bitmap_offset, clean_bitmap_len;
uint8_t *clean_init_data_csum;
void *new_clean_bitmap;
uint8_t *clean_init_dyn_ptr;
uint8_t *new_clean_bitmap;
uint64_t new_trim_pos;
// local: scan_dirty()
uint64_t offset, end_offset, submit_offset, submit_len;
friend class journal_flusher_t;
bool scan_dirty(int wait_base);
void scan_dirty();
bool read_dirty(int wait_base);
bool modify_meta_do_reads(int wait_base);
bool wait_meta_reads(int wait_base);
bool modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base);
bool clear_incomplete_csum_block_bits(int wait_base);
void calc_block_checksums(uint32_t *new_data_csums, bool skip_overwrites);
void update_metadata_entry();
bool write_meta_block(flusher_meta_write_t & meta_block, int wait_base);
void update_clean_db();
void free_data_blocks();
bool fsync_batch(bool fsync_meta, int wait_base);
bool trim_journal(int wait_base);
void free_buffers();
public:
journal_flusher_co();
bool loop();
@ -97,9 +115,10 @@ class journal_flusher_t
std::map<uint64_t, meta_sector_t> meta_sectors;
std::deque<object_id> flush_queue;
std::map<object_id, uint64_t> flush_versions;
std::map<object_id, uint64_t> flush_versions; // FIXME: consider unordered_map?
bool try_find_older(std::map<obj_ver_id, dirty_entry>::iterator & dirty_end, obj_ver_id & cur);
bool try_find_other(std::map<obj_ver_id, dirty_entry>::iterator & dirty_end, obj_ver_id & cur);
public:
journal_flusher_t(blockstore_impl_t *bs);

View File

@ -177,15 +177,23 @@ struct __attribute__((__packed__)) dirty_entry
// Suspend operation until there is some free space on the data device
#define WAIT_FREE 5
struct fulfill_read_t
struct used_clean_obj_t
{
uint64_t offset, len;
uint64_t journal_sector; // sector+1 if used and !journal.inmemory, otherwise 0
uint32_t item_state;
uint64_t disk_offset;
void *csum;
int refs;
uint64_t freed_block; // block+1 if freed, otherwise 0
uint8_t *meta; // metadata copy
};
// https://github.com/algorithm-ninja/cpp-btree
// https://github.com/greg7mdp/sparsepp/ was used previously, but it was TERRIBLY slow after resizing
// with sparsepp, random reads dropped to ~700 iops very fast with just as much as ~32k objects in the DB
typedef btree::btree_map<object_id, clean_entry> blockstore_clean_db_t;
typedef std::map<obj_ver_id, dirty_entry> blockstore_dirty_db_t;
#include "blockstore_init.h"
#include "blockstore_flush.h"
#define PRIV(op) ((blockstore_op_private_t*)(op)->private_data)
#define FINISH_OP(op) PRIV(op)->~blockstore_op_private_t(); std::function<void (blockstore_op_t*)>(op->callback)(op)
@ -198,7 +206,8 @@ struct blockstore_op_private_t
int op_state;
// Read
std::vector<fulfill_read_t> read_vec;
uint64_t clean_version_used;
std::vector<copy_buffer_t> read_vec;
// Sync, write
int min_flushed_journal_sector, max_flushed_journal_sector;
@ -214,16 +223,6 @@ struct blockstore_op_private_t
int sync_small_checked, sync_big_checked;
};
// https://github.com/algorithm-ninja/cpp-btree
// https://github.com/greg7mdp/sparsepp/ was used previously, but it was TERRIBLY slow after resizing
// with sparsepp, random reads dropped to ~700 iops very fast with just as much as ~32k objects in the DB
typedef btree::btree_map<object_id, clean_entry> blockstore_clean_db_t;
typedef std::map<obj_ver_id, dirty_entry> blockstore_dirty_db_t;
#include "blockstore_init.h"
#include "blockstore_flush.h"
typedef uint32_t pool_id_t;
typedef uint64_t pool_pg_id_t;
@ -285,6 +284,9 @@ class blockstore_impl_t
int big_to_flush = 0;
int write_iodepth = 0;
// clean data blocks referenced by read operations
std::map<obj_ver_id, used_clean_obj_t> used_clean_objects;
bool live = false, queue_stall = false;
ring_loop_t *ringloop;
timerfd_manager_t *tfd;
@ -327,8 +329,21 @@ class blockstore_impl_t
// Read
int dequeue_read(blockstore_op_t *read_op);
int fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
void find_holes(std::vector<copy_buffer_t> & read_vec, uint32_t item_start, uint32_t item_end,
std::function<int(int, bool, uint32_t, uint32_t)> callback);
int fulfill_read(blockstore_op_t *read_op, uint64_t & fulfilled, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location, uint64_t journal_sector, uint8_t *csum);
int fill_partial_checksum_blocks(std::vector<copy_buffer_t> & rv, uint64_t & fulfilled,
uint8_t *clean_entry_bitmap, uint8_t *read_buf, uint64_t read_offset, uint64_t read_end);
bool read_range_fulfilled(std::vector<copy_buffer_t> & rv, uint64_t & fulfilled, uint8_t *read_buf,
uint8_t *clean_entry_bitmap, uint32_t item_start, uint32_t item_end);
bool read_clean_checksum_block(blockstore_op_t *op, int rv_pos,
uint64_t &fulfilled, uint64_t clean_loc, uint32_t item_start, uint32_t item_end);
bool verify_padded_checksums(uint8_t *clean_entry_bitmap, uint32_t offset,
iovec *iov, int n_iov, std::function<void(uint32_t, uint32_t, uint32_t)> bad_block_cb);
bool verify_journal_checksums(uint8_t *csums, uint32_t offset,
iovec *iov, int n_iov, std::function<void(uint32_t, uint32_t, uint32_t)> bad_block_cb);
bool verify_read_padded_checksums(blockstore_op_t *op, uint64_t clean_loc, iovec *iov, int n_iov);
int fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len,
uint32_t item_state, uint64_t item_version);
void handle_read_event(ring_data_t *data, blockstore_op_t *op);

View File

@ -796,24 +796,32 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
else
{
uint32_t *block_csums = (uint32_t*)((uint8_t*)je + sizeof(journal_entry_small_write) + bs->dsk.clean_entry_bitmap_size);
uint32_t block_crc32 = 0;
uint32_t start = je->small_write.offset / bs->dsk.csum_block_size;
uint32_t end = (je->small_write.offset+je->small_write.len-1) / bs->dsk.csum_block_size;
int sd_num = 0;
size_t sd_pos = 0;
for (uint64_t pos = 0; pos < je->small_write.len; pos += bs->dsk.csum_block_size, block_csums++)
for (uint32_t pos = start; pos <= end; pos++, block_csums++)
{
size_t block_left = bs->dsk.csum_block_size;
block_crc32 = 0;
size_t block_left = (pos == start
? (start == end
? je->small_write.len
: bs->dsk.csum_block_size - je->small_write.offset%bs->dsk.csum_block_size)
: (pos < end
? bs->dsk.csum_block_size
: (je->small_write.offset + je->small_write.len)%bs->dsk.csum_block_size));
uint32_t block_crc32 = 0;
while (block_left > 0)
{
assert(sd_num < small_write_data.size());
if (small_write_data[sd_num].iov_len >= sd_pos+block_left)
{
block_crc32 = crc32c(block_crc32, small_write_data[sd_num].iov_base+sd_pos, block_left);
block_crc32 = crc32c(block_crc32, (uint8_t*)small_write_data[sd_num].iov_base+sd_pos, block_left);
sd_pos += block_left;
break;
}
else
{
block_crc32 = crc32c(block_crc32, small_write_data[sd_num].iov_base+sd_pos, small_write_data[sd_num].iov_len-sd_pos);
block_crc32 = crc32c(block_crc32, (uint8_t*)small_write_data[sd_num].iov_base+sd_pos, small_write_data[sd_num].iov_len-sd_pos);
block_left -= (small_write_data[sd_num].iov_len-sd_pos);
sd_pos = 0;
sd_num++;
@ -821,8 +829,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
}
if (block_crc32 != *block_csums)
{
printf("Journal entry data is corrupt (block %lu crc32 %x != %x)\n",
pos / bs->dsk.csum_block_size, block_crc32, *block_csums);
printf("Journal entry data is corrupt (block %u crc32 %x != %x)\n",
pos, block_crc32, *block_csums);
data_csum_valid = false;
break;
}
@ -848,7 +856,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
.oid = je->small_write.oid,
.version = je->small_write.version,
};
uint64_t dyn_size = bs->dsk.dirty_dyn_size(je->small_write.len);
uint64_t dyn_size = bs->dsk.dirty_dyn_size(je->small_write.offset, je->small_write.len);
void *dyn = NULL;
void *dyn_from = (uint8_t*)je + sizeof(journal_entry_small_write);
if (dyn_size <= sizeof(void*))
@ -937,7 +945,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
.oid = je->big_write.oid,
.version = je->big_write.version,
};
uint64_t dyn_size = bs->dsk.dirty_dyn_size(je->big_write.len);
uint64_t dyn_size = bs->dsk.dirty_dyn_size(je->big_write.offset, je->big_write.len);
void *dyn = NULL;
void *dyn_from = (uint8_t*)je + sizeof(journal_entry_big_write);
if (dyn_size <= sizeof(void*))

View File

@ -17,6 +17,7 @@ blockstore_journal_check_t::blockstore_journal_check_t(blockstore_impl_t *bs)
// Check if we can write <required> entries of <size> bytes and <data_after> data bytes after them to the journal
int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries_required, int size, int data_after)
{
uint64_t prev_next = next_sector;
int required = entries_required;
while (1)
{
@ -35,11 +36,19 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries
}
required -= fits;
next_in_pos += fits * size;
sectors_to_write++;
if (next_sector != prev_next || !sectors_to_write)
{
// Except the previous call to this function
sectors_to_write++;
}
}
else if (bs->journal.sector_info[next_sector].dirty)
{
sectors_to_write++;
if (next_sector != prev_next || !sectors_to_write)
{
// Except the previous call to this function
sectors_to_write++;
}
}
if (required <= 0)
{
@ -289,3 +298,31 @@ void journal_t::dump_diagnostics()
journal_used_it == used_sectors.end() ? 0 : journal_used_it->second
);
}
static uint64_t zero_page[4096];
uint32_t crc32c_pad(uint32_t prev_crc, const void *buf, size_t len, size_t left_pad, size_t right_pad)
{
uint32_t r = prev_crc;
while (left_pad >= 4096)
{
r = crc32c(r, zero_page, 4096);
left_pad -= 4096;
}
if (left_pad > 0)
r = crc32c(r, zero_page, left_pad);
r = crc32c(r, buf, len);
while (right_pad >= 4096)
{
r = crc32c(r, zero_page, 4096);
right_pad -= 4096;
}
if (left_pad > 0)
r = crc32c(r, zero_page, right_pad);
return r;
}
uint32_t crc32c_nopad(uint32_t prev_crc, const void *buf, size_t len, size_t left_pad, size_t right_pad)
{
return crc32c(0, buf, len);
}

View File

@ -227,3 +227,6 @@ struct blockstore_journal_check_t
};
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size);
uint32_t crc32c_pad(uint32_t prev_crc, const void *buf, size_t len, size_t left_pad, size_t right_pad);
uint32_t crc32c_nopad(uint32_t prev_crc, const void *buf, size_t len, size_t left_pad, size_t right_pad);

View File

@ -1,6 +1,7 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include <limits.h>
#include "blockstore_impl.h"
int blockstore_impl_t::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len,
@ -40,64 +41,98 @@ int blockstore_impl_t::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_
return 1;
}
// FIXME I've seen a bug here so I want some tests
void blockstore_impl_t::find_holes(std::vector<copy_buffer_t> & read_vec,
uint32_t item_start, uint32_t item_end,
std::function<int(int, bool, uint32_t, uint32_t)> callback)
{
auto cur_start = item_start;
auto alloc_start = item_start;
int i = 0;
while (1)
{
// COPY_BUF_CSUM_FILL items are fake items inserted in the end, their offsets aren't in order
for (; i < read_vec.size() && !(read_vec[i].copy_flags & COPY_BUF_CSUM_FILL); i++)
{
if (read_vec[i].offset >= cur_start)
break;
else if (read_vec[i].offset + read_vec[i].len > cur_start)
{
// Allocated: cur_start .. read_vec[i].offset + read_vec[i].len
cur_start = read_vec[i].offset + read_vec[i].len;
if (cur_start >= item_end)
goto endwhile;
}
}
if (i < read_vec.size() && !(read_vec[i].copy_flags & COPY_BUF_CSUM_FILL) && read_vec[i].offset == cur_start)
{
// Allocated - don't move alloc_start
}
else
{
// Hole
uint32_t cur_end = (i == read_vec.size() || (read_vec[i].copy_flags & COPY_BUF_CSUM_FILL) || read_vec[i].offset >= item_end
? item_end : read_vec[i].offset);
if (alloc_start < cur_start)
i += callback(i, true, alloc_start, cur_start);
i += callback(i, false, cur_start, cur_end);
alloc_start = cur_end;
}
if (i >= read_vec.size() || (read_vec[i].copy_flags & COPY_BUF_CSUM_FILL))
break;
cur_start = read_vec[i].offset + read_vec[i].len;
if (cur_start >= item_end)
break;
}
endwhile:
if (alloc_start < cur_start)
i += callback(i, true, alloc_start, cur_start);
}
int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op,
uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location,
uint64_t journal_sector, uint8_t *csum)
{
int r = 1;
uint32_t cur_start = item_start;
if (cur_start < read_op->offset + read_op->len && item_end > read_op->offset)
{
cur_start = cur_start < read_op->offset ? read_op->offset : cur_start;
item_end = item_end > read_op->offset + read_op->len ? read_op->offset + read_op->len : item_end;
auto it = PRIV(read_op)->read_vec.begin();
while (1)
find_holes(PRIV(read_op)->read_vec, cur_start, item_end, [&](int pos, bool alloc, uint32_t start, uint32_t end)
{
for (; it != PRIV(read_op)->read_vec.end(); it++)
if (alloc)
return 0;
copy_buffer_t el = {
.copy_flags = (IS_JOURNAL(item_state) ? COPY_BUF_JOURNAL : COPY_BUF_DATA),
.offset = start,
.len = end-start,
.disk_offset = item_location + el.offset - item_start,
.journal_sector = journal_sector,
.csum_buf = !csum ? NULL : (csum + (cur_start - item_start) / dsk.csum_block_size * (dsk.data_csum_type & 0xFF)),
};
if (IS_BIG_WRITE(item_state))
{
if (it->offset >= cur_start)
{
break;
}
else if (it->offset + it->len > cur_start)
{
cur_start = it->offset + it->len;
if (cur_start >= item_end)
{
goto endwhile;
}
}
// If we don't track it then we may IN THEORY read another object's data:
// submit read -> remove the object -> flush remove -> overwrite with another object -> finish read
// Very improbable, but possible
PRIV(read_op)->clean_version_used = 1;
}
if (it == PRIV(read_op)->read_vec.end() || it->offset > cur_start)
PRIV(read_op)->read_vec.insert(PRIV(read_op)->read_vec.begin() + pos, el);
if (!fulfill_read_push(read_op,
(uint8_t*)read_op->buf + el.offset - read_op->offset,
item_location + el.offset - item_start,
el.len, item_state, item_version))
{
fulfill_read_t el = {
.offset = cur_start,
.len = it == PRIV(read_op)->read_vec.end() || it->offset >= item_end ? item_end-cur_start : it->offset-cur_start,
.journal_sector = journal_sector,
.item_state = item_state,
.disk_offset = item_location + el.offset - item_start,
.csum = !csum ? NULL : (csum + (cur_start - item_start) / dsk.csum_block_size * (dsk.data_csum_type & 0xFF)),
};
it = PRIV(read_op)->read_vec.insert(it, el);
if (!fulfill_read_push(read_op,
(uint8_t*)read_op->buf + el.offset - read_op->offset,
item_location + el.offset - item_start,
el.len, item_state, item_version))
{
return 0;
}
fulfilled += el.len;
PRIV(read_op)->read_vec.clear();
r = 0;
return 0;
}
cur_start = it->offset + it->len;
if (it == PRIV(read_op)->read_vec.end() || cur_start >= item_end)
{
break;
}
}
fulfilled += el.len;
return 1;
});
}
endwhile:
return 1;
return r;
}
uint8_t* blockstore_impl_t::get_clean_entry_bitmap(uint64_t block_loc, int offset)
@ -115,6 +150,175 @@ uint8_t* blockstore_impl_t::get_clean_entry_bitmap(uint64_t block_loc, int offse
return clean_entry_bitmap;
}
int blockstore_impl_t::fill_partial_checksum_blocks(std::vector<copy_buffer_t> & rv, uint64_t & fulfilled,
uint8_t *clean_entry_bitmap, uint8_t *read_buf, uint64_t read_offset, uint64_t read_end)
{
if (read_end == read_offset)
return 0;
int required = 0;
read_buf -= read_offset;
uint32_t last_block = (read_end-1)/dsk.csum_block_size;
uint32_t start_block = read_offset/dsk.csum_block_size;
uint32_t end_block = 0;
while (start_block <= last_block)
{
if (read_range_fulfilled(rv, fulfilled, read_buf, clean_entry_bitmap,
start_block*dsk.csum_block_size < read_offset ? read_offset : start_block*dsk.csum_block_size,
(start_block+1)*dsk.csum_block_size > read_end ? read_end : (start_block+1)*dsk.csum_block_size))
{
// read_range_fulfilled() also adds zero-filled areas
start_block++;
}
else
{
// Find a sequence of checksum blocks required to be read
end_block = start_block;
while ((end_block+1)*dsk.csum_block_size < read_end &&
!read_range_fulfilled(rv, fulfilled, read_buf, clean_entry_bitmap,
(end_block+1)*dsk.csum_block_size < read_offset ? read_offset : (end_block+1)*dsk.csum_block_size,
(end_block+2)*dsk.csum_block_size > read_end ? read_end : (end_block+2)*dsk.csum_block_size))
{
end_block++;
}
end_block++;
// OK, mark this range as required
rv.push_back((copy_buffer_t){
.copy_flags = COPY_BUF_CSUM_FILL,
.offset = start_block*dsk.csum_block_size,
.len = (end_block-start_block)*dsk.csum_block_size,
});
start_block = end_block;
required++;
}
}
return required;
}
// read_buf should be == op->buf - op->offset
bool blockstore_impl_t::read_range_fulfilled(std::vector<copy_buffer_t> & rv, uint64_t & fulfilled, uint8_t *read_buf,
uint8_t *clean_entry_bitmap, uint32_t item_start, uint32_t item_end)
{
bool all_done = true;
find_holes(rv, item_start, item_end, [&](int pos, bool alloc, uint32_t cur_start, uint32_t cur_end)
{
if (alloc)
return 0;
int diff = 0;
uint32_t bmp_start = cur_start/dsk.bitmap_granularity;
uint32_t bmp_end = cur_end/dsk.bitmap_granularity;
uint32_t bmp_pos = bmp_start;
while (bmp_pos < bmp_end)
{
while (bmp_pos < bmp_end && !(clean_entry_bitmap[bmp_pos >> 3] & (1 << (bmp_pos & 0x7))))
bmp_pos++;
if (bmp_pos > bmp_start)
{
// zero fill
copy_buffer_t el = {
.copy_flags = COPY_BUF_ZERO,
.offset = bmp_start*dsk.bitmap_granularity,
.len = (bmp_pos-bmp_start)*dsk.bitmap_granularity,
};
rv.insert(rv.begin() + pos, el);
if (read_buf)
memset(read_buf + el.offset, 0, el.len);
fulfilled += el.len;
diff++;
}
bmp_start = bmp_pos;
while (bmp_pos < bmp_end && (clean_entry_bitmap[bmp_pos >> 3] & (1 << (bmp_pos & 0x7))))
bmp_pos++;
if (bmp_pos > bmp_start)
{
// something is to be read
all_done = false;
}
bmp_start = bmp_pos;
}
return diff;
});
return all_done;
}
bool blockstore_impl_t::read_clean_checksum_block(blockstore_op_t *op, int rv_pos,
uint64_t &fulfilled, uint64_t clean_loc, uint32_t item_start, uint32_t item_end)
{
auto & rv = PRIV(op)->read_vec;
uint32_t fill_size = 0;
int n_iov = 0;
find_holes(rv, item_start, item_end, [&](int pos, bool alloc, uint32_t cur_start, uint32_t cur_end)
{
if (alloc)
fill_size += cur_end-cur_start;
n_iov++;
return 0;
});
void *buf = memalign_or_die(MEM_ALIGNMENT, fill_size + n_iov*sizeof(struct iovec));
iovec *iov = (struct iovec*)((uint8_t*)buf+fill_size);
n_iov = 0;
fill_size = 0;
find_holes(rv, item_start, item_end, [&](int pos, bool alloc, uint32_t cur_start, uint32_t cur_end)
{
if (alloc)
{
iov[n_iov++] = (struct iovec){ (uint8_t*)buf+fill_size, cur_end-cur_start };
fill_size += cur_end-cur_start;
}
else
{
iov[n_iov++] = (struct iovec){ (uint8_t*)op->buf+cur_start-op->offset, cur_end-cur_start };
rv.insert(rv.begin() + pos, (copy_buffer_t){
.copy_flags = COPY_BUF_DATA,
.offset = cur_start,
.len = cur_end-cur_start,
.disk_offset = clean_loc,
});
fulfilled += cur_end-cur_start;
return 1;
}
return 0;
});
// Save buf into read_vec too but in a creepy way
// FIXME: Shit, something else should be invented %)
rv[rv.size()-rv_pos] = (copy_buffer_t){
.copy_flags = COPY_BUF_CSUM_FILL,
.offset = 0xffffffff,
.len = ((uint64_t)n_iov << 32) | fill_size,
.disk_offset = clean_loc + item_start,
.csum_buf = (uint8_t*)buf,
};
uint32_t d_pos = 0;
for (int n_pos = 0; n_pos < n_iov; n_pos += IOV_MAX)
{
int n_cur = n_iov-n_pos < IOV_MAX ? n_iov-n_pos : IOV_MAX;
BS_SUBMIT_GET_SQE(sqe, data);
PRIV(op)->pending_ops++;
my_uring_prep_readv(
sqe, dsk.data_fd, iov + n_pos, n_cur, dsk.data_offset + clean_loc + d_pos
);
data->callback = [this, op](ring_data_t *data) { handle_read_event(data, op); };
if (n_pos > 0 || n_pos + IOV_MAX < n_iov)
{
uint32_t d_len = 0;
for (int i = 0; i < IOV_MAX; i++)
d_len += iov[n_pos+i].iov_len;
data->iov.iov_len = d_len;
d_pos += d_len;
}
else
data->iov.iov_len = item_end-item_start;
}
// Reading may race with flushing.
// - Flushing happens in 3 steps: (2) punch holes in meta -> (4) update data -> (6) update meta
// - Reading may start/end at: 1/3, 1/5, 1/7, 3/5, 3/7, 5/7
// - 1/3, 1/5, 3/5 are not a problem because we'll check data using punched bitmap and CRCs
// - For 1/7, 3/7 and 5/7 to finish correctly we need a copy of punched metadata
// otherwise the checksum may not match
// So flushers save a copy of punched metadata if the object is being read during (6).
PRIV(op)->clean_version_used = 1;
return true;
}
int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
{
auto & clean_db = clean_db_shard(read_op->oid);
@ -136,6 +340,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
}
uint64_t fulfilled = 0;
PRIV(read_op)->pending_ops = 0;
PRIV(read_op)->clean_version_used = 0;
uint64_t result_version = 0;
if (dirty_found)
{
@ -153,7 +358,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
FINISH_OP(read_op);
return 2;
}
size_t dyn_size = dsk.dirty_dyn_size(dirty.len);
size_t dyn_size = dsk.dirty_dyn_size(dirty.offset, dirty.len);
uint8_t *bmp_ptr = (uint8_t*)(dyn_size > sizeof(void*) ? dirty.dyn_data : &dirty.dyn_data);
if (!result_version)
{
@ -164,6 +369,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
}
}
// If inmemory_journal is false, journal trim will have to wait until the read is completed
// FIXME: Verify checksums when reading from journal disk
if (!fulfill_read(read_op, fulfilled, dirty.offset, dirty.offset + dirty.len,
dirty.state, dirty_it->first.version, dirty.location + (IS_JOURNAL(dirty.state) ? 0 : dirty.offset),
(IS_JOURNAL(dirty.state) ? dirty.journal_sector+1 : 0),
@ -206,6 +412,30 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
return 0;
}
}
else if (dsk.csum_block_size > dsk.bitmap_granularity)
{
auto & rv = PRIV(read_op)->read_vec;
int req = fill_partial_checksum_blocks(rv, fulfilled, clean_entry_bitmap,
(uint8_t*)read_op->buf, read_op->offset, read_op->offset+read_op->len);
for (int i = req; i > 0; i--)
{
auto & vi = rv[rv.size()-i];
if (!read_clean_checksum_block(read_op, i, fulfilled, clean_it->second.location, vi.offset, vi.offset+vi.len))
{
// need to wait. undo added requests, don't dequeue op
for (auto & vec: rv)
{
if (vec.copy_flags == COPY_BUF_CSUM_FILL && vec.csum_buf)
{
free(vec.csum_buf);
vec.csum_buf = NULL;
}
}
rv.clear();
return 0;
}
}
}
else
{
uint64_t bmp_start = 0, bmp_end = 0, bmp_size = dsk.data_block_size/dsk.bitmap_granularity;
@ -243,6 +473,13 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
}
}
}
// Increment counter if clean data is being read from the disk
if (PRIV(read_op)->clean_version_used)
{
obj_ver_id ov = { .oid = read_op->oid, .version = clean_it->second.version };
used_clean_objects[ov].refs++;
PRIV(read_op)->clean_version_used = ov.version;
}
}
}
if (!result_version)
@ -284,6 +521,114 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
return 2;
}
bool blockstore_impl_t::verify_padded_checksums(uint8_t *clean_entry_bitmap, uint32_t offset,
iovec *iov, int n_iov, std::function<void(uint32_t, uint32_t, uint32_t)> bad_block_cb)
{
assert(!(offset % dsk.csum_block_size));
uint32_t *csums = (uint32_t*)(clean_entry_bitmap + 2*dsk.clean_entry_bitmap_size);
uint32_t block_csum = 0;
uint32_t block_done = 0;
uint32_t block_num = clean_entry_bitmap ? offset/dsk.csum_block_size : 0;
uint32_t bmp_pos = offset/dsk.bitmap_granularity;
for (int i = 0; i < n_iov; i++)
{
uint32_t pos = 0;
while (pos < iov[i].iov_len)
{
uint32_t start = pos;
uint8_t bit = (clean_entry_bitmap[bmp_pos >> 3] >> (bmp_pos & 0x7)) & 1;
while (pos < iov[i].iov_len && ((clean_entry_bitmap[bmp_pos >> 3] >> (bmp_pos & 0x7)) & 1) == bit)
{
pos += dsk.bitmap_granularity;
bmp_pos++;
}
uint32_t len = pos-start;
auto buf = (uint8_t*)iov[i].iov_base+start;
while (block_done+len >= dsk.csum_block_size)
{
auto cur_len = dsk.csum_block_size-block_done;
block_csum = crc32c_pad(block_csum, buf, bit ? cur_len : 0, bit ? 0 : cur_len, 0);
if (block_csum != csums[block_num])
{
if (bad_block_cb)
bad_block_cb(block_num*dsk.csum_block_size, block_csum, csums[block_num]);
else
return false;
}
block_num++;
buf += cur_len;
len -= cur_len;
block_done = block_csum = 0;
}
if (len > 0)
{
block_csum = crc32c_pad(block_csum, buf, bit ? len : 0, bit ? 0 : len, 0);
block_done += len;
}
}
}
assert(!block_done);
return true;
}
bool blockstore_impl_t::verify_journal_checksums(uint8_t *csums, uint32_t offset,
iovec *iov, int n_iov, std::function<void(uint32_t, uint32_t, uint32_t)> bad_block_cb)
{
uint32_t block_csum = 0;
uint32_t block_num = 0;
uint32_t block_done = offset%dsk.csum_block_size;
for (int i = 0; i < n_iov; i++)
{
uint32_t len = iov[i].iov_len;
auto buf = (uint8_t*)iov[i].iov_base;
while (block_done+len >= dsk.csum_block_size)
{
auto cur_len = dsk.csum_block_size-block_done;
block_csum = crc32c(block_csum, buf, cur_len);
if (block_csum != ((uint32_t*)csums)[block_num])
{
if (bad_block_cb)
bad_block_cb(block_num*dsk.csum_block_size - (offset%dsk.csum_block_size), block_csum, ((uint32_t*)csums)[block_num]);
else
return false;
}
block_num++;
buf += cur_len;
len -= cur_len;
block_done = block_csum = 0;
}
if (len > 0)
{
block_csum = crc32c(block_csum, buf, len);
block_done += len;
}
}
if (block_done > 0 && block_csum != ((uint32_t*)csums)[block_num])
{
if (bad_block_cb)
bad_block_cb(block_num*dsk.csum_block_size - (offset%dsk.csum_block_size), block_csum, ((uint32_t*)csums)[block_num]);
else
return false;
}
return true;
}
bool blockstore_impl_t::verify_read_padded_checksums(blockstore_op_t *op, uint64_t clean_loc, iovec *iov, int n_iov)
{
uint32_t offset = clean_loc % dsk.data_block_size;
clean_loc = (clean_loc >> dsk.block_order) << dsk.block_order;
// First verify against the newest checksum version
uint8_t *clean_entry_bitmap = get_clean_entry_bitmap(clean_loc, 0);
if (verify_padded_checksums(clean_entry_bitmap, offset, iov, n_iov, NULL))
return true;
// Check through all relevant "metadata backups" possibly added by flushers
auto mb_it = used_clean_objects.lower_bound((obj_ver_id){ .oid = op->oid, .version = PRIV(op)->clean_version_used });
for (; mb_it != used_clean_objects.end() && mb_it->first.oid == op->oid; mb_it++)
if (mb_it->second.meta != NULL && verify_padded_checksums(mb_it->second.meta, offset, iov, n_iov, NULL))
return true;
return false;
}
void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op)
{
live = true;
@ -293,43 +638,97 @@ void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op
// read error
op->retval = data->res;
}
else if (dsk.csum_block_size)
if (PRIV(op)->pending_ops == 0)
{
// verify checksum if required
uint64_t el_offset = (uint8_t*)data->iov.iov_base - ((uint8_t*)op->buf - op->offset);
auto & vecs = PRIV(op)->read_vec;
int el_min = 0, el_max = vecs.size();
while (el_max > el_min+1)
if (dsk.csum_block_size)
{
int mid = (el_min+el_max)/2;
if (el_offset < vecs[mid].offset)
el_max = mid;
else
el_min = mid;
}
auto & vec = vecs[el_min];
assert(vec.offset == el_offset);
if (vec.csum)
{
uint32_t *csum = (uint32_t*)vec.csum;
for (size_t p = 0; p < data->iov.iov_len; p += dsk.csum_block_size, csum++)
// verify checksums if required
auto & rv = PRIV(op)->read_vec;
if (dsk.csum_block_size > dsk.bitmap_granularity)
{
if (crc32c(0, data->iov.iov_base + p, dsk.csum_block_size) != *csum)
for (int i = rv.size()-1; i >= 0 && rv[i].copy_flags == COPY_BUF_CSUM_FILL; i--)
{
// checksum error
printf(
"Checksum mismatch in %s area at offset 0x%lx: %08lx vs %08lx\n",
IS_JOURNAL(vec.item_state) ? "journal" : "data",
crc32c(0, data->iov.iov_base + p, dsk.csum_block_size), *csum
);
op->retval = -EDOM;
break;
struct iovec *iov = (struct iovec*)(rv[i].csum_buf + (rv[i].len & 0xFFFFFFFF));
if (!verify_read_padded_checksums(op, rv[i].disk_offset, iov, rv[i].len >> 32))
op->retval = -EDOM;
free(rv[i].csum_buf);
rv[i].csum_buf = NULL;
}
}
else
{
for (auto & vec: rv)
{
if (vec.csum_buf)
{
uint32_t *csum = (uint32_t*)vec.csum_buf;
for (size_t p = 0; p < data->iov.iov_len; p += dsk.csum_block_size, csum++)
{
if (crc32c(0, (uint8_t*)data->iov.iov_base + p, dsk.csum_block_size) != *csum)
{
// checksum error
printf(
"Checksum mismatch in object %lx:%lx v%lu in %s area at offset 0x%lx: %08x vs %08x\n",
op->oid.inode, op->oid.stripe, op->version,
(vec.copy_flags & COPY_BUF_JOURNAL) ? "journal" : "data", vec.disk_offset,
crc32c(0, (uint8_t*)data->iov.iov_base + p, dsk.csum_block_size), *csum
);
op->retval = -EDOM;
break;
}
}
}
}
}
}
if (PRIV(op)->clean_version_used)
{
// Release clean data block
obj_ver_id ov = { .oid = op->oid, .version = PRIV(op)->clean_version_used };
auto uo_it = used_clean_objects.find(ov);
if (uo_it != used_clean_objects.end())
{
uo_it->second.refs--;
if (uo_it->second.refs <= 0)
{
// Check to the left - even older usage entries may exist
bool still_used = false;
while (uo_it != used_clean_objects.begin())
{
uo_it--;
if (uo_it->first.oid != op->oid)
{
uo_it++;
break;
}
if (uo_it->second.refs > 0)
{
still_used = true;
break;
}
}
// Free uo_it AND all following records with refs==0 too
if (!still_used)
{
while (uo_it != used_clean_objects.end() &&
uo_it->first.oid == op->oid &&
uo_it->second.refs == 0)
{
if (uo_it->second.freed_block > 0)
{
data_alloc->set(uo_it->second.freed_block-1, false);
}
if (uo_it->second.meta)
{
free(uo_it->second.meta);
uo_it->second.meta = NULL;
}
used_clean_objects.erase(uo_it++);
}
}
}
}
}
}
if (PRIV(op)->pending_ops == 0)
{
if (!journal.inmemory)
{
// Release journal sector usage
@ -370,7 +769,7 @@ int blockstore_impl_t::read_bitmap(object_id oid, uint64_t target_version, void
*result_version = dirty_it->first.version;
if (bitmap)
{
size_t dyn_size = dsk.dirty_dyn_size(dirty_it->second.len);
size_t dyn_size = dsk.dirty_dyn_size(dirty_it->second.offset, dirty_it->second.len);
void *dyn_ptr = (dyn_size > sizeof(void*) ? dirty_it->second.dyn_data : &dirty_it->second.dyn_data);
memcpy(bitmap, dyn_ptr, dsk.clean_entry_bitmap_size);
}

View File

@ -241,10 +241,10 @@ void blockstore_impl_t::free_dirty_dyn_data(dirty_entry & e)
{
if (e.dyn_data)
{
size_t dyn_size = dsk.dirty_dyn_size(e.len);
size_t dyn_size = dsk.dirty_dyn_size(e.offset, e.len);
if (dyn_size > sizeof(void*) &&
(!journal.inmemory || e.dyn_data < journal.buffer ||
e.dyn_data >= journal.buffer + journal.len))
e.dyn_data >= (uint8_t*)journal.buffer + journal.len))
{
// dyn_data contains the bitmap and checksums
// free it if it doesn't refer to the in-memory journal

View File

@ -78,7 +78,23 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
// 2nd step: Data device is synced, prepare & write journal entries
// Check space in the journal and journal memory buffers
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(),
if (dsk.csum_block_size)
{
// More complex check because all journal entries have different lengths
int left = PRIV(op)->sync_big_writes.size();
for (auto & sbw: PRIV(op)->sync_big_writes)
{
left--;
auto & dirty_entry = dirty_db.at(sbw);
uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.offset, dirty_entry.len);
if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size,
left == 0 ? JOURNAL_STABILIZE_RESERVATION : 0))
{
return 0;
}
}
}
else if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(),
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION))
{
return 0;
@ -90,16 +106,17 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
int s = 0;
while (it != PRIV(op)->sync_big_writes.end())
{
if (!journal.entry_fits(sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size) &&
auto & dirty_entry = dirty_db.at(*it);
uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.offset, dirty_entry.len);
if (!journal.entry_fits(sizeof(journal_entry_big_write) + dyn_size) &&
journal.sector_info[journal.cur_sector].dirty)
{
prepare_journal_sector_write(journal.cur_sector, op);
s++;
}
auto & dirty_entry = dirty_db.at(*it);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, (dirty_entry.state & BS_ST_INSTANT) ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size
sizeof(journal_entry_big_write) + dyn_size
);
dirty_entry.journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
@ -115,7 +132,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
je->offset = dirty_entry.offset;
je->len = dirty_entry.len;
je->location = dirty_entry.location;
uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.len);
memcpy((void*)(je+1), (dyn_size > sizeof(void*) ? dirty_entry.dyn_data : &dirty_entry.dyn_data), dyn_size);
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;

View File

@ -13,7 +13,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
{
op->len = 0;
}
size_t dyn_size = dsk.dirty_dyn_size(op->len);
size_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len);
if (!is_del && dyn_size > sizeof(void*))
{
dyn = calloc_or_die(1, dyn_size);
@ -39,7 +39,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
: ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG);
if (!is_del && !deleted)
{
void *dyn_from = dsk.dirty_dyn_size(dirty_it->second.len) > sizeof(void*)
void *dyn_from = dsk.dirty_dyn_size(dirty_it->second.offset, dirty_it->second.len) > sizeof(void*)
? dirty_it->second.dyn_data : &dirty_it->second.dyn_data;
memcpy(dyn_ptr, dyn_from, dsk.clean_entry_bitmap_size);
}
@ -163,7 +163,6 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
if (op->bitmap)
{
// Only allow to overwrite part of the object bitmap respective to the write's offset/len
// FIXME Don't merge bitmaps here - parallel writes into one object may break the bitmap
uint32_t bit = op->offset/dsk.bitmap_granularity;
uint32_t bits_left = op->len/dsk.bitmap_granularity;
while (!(bit % 8) && bits_left >= 8)
@ -184,13 +183,25 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
}
}
// Calculate checksums
// FIXME: Allow to receive them from outside
if (dsk.data_csum_type)
// FIXME: Allow to receive checksums from outside?
if (!is_del && dsk.data_csum_type && op->len > 0)
{
uint32_t *data_csum = (uint32_t*)(dyn_ptr + dsk.clean_entry_bitmap_size);
for (uint32_t i = 0; i < op->len / dsk.csum_block_size; i++)
uint32_t *data_csums = (uint32_t*)(dyn_ptr + dsk.clean_entry_bitmap_size);
uint32_t start = op->offset / dsk.csum_block_size;
uint32_t end = (op->offset+op->len-1) / dsk.csum_block_size;
auto fn = state & BS_ST_BIG_WRITE ? crc32c_pad : crc32c_nopad;
if (start == end)
data_csums[0] = fn(0, op->buf, op->len, op->offset - start*dsk.csum_block_size, end*dsk.csum_block_size - (op->offset+op->len));
else
{
data_csum[i] = crc32c(0, op->buf + i*dsk.csum_block_size, dsk.csum_block_size);
data_csums[0] = fn(0, op->buf, dsk.csum_block_size*(start+1)-op->offset, op->offset - start*dsk.csum_block_size, 0);
for (uint32_t i = start+1; i < end; i++)
data_csums[i-start] = crc32c(0, (uint8_t*)op->buf + dsk.csum_block_size*i-op->offset, dsk.csum_block_size);
data_csums[end-start] = fn(
0, (uint8_t*)op->buf + end*dsk.csum_block_size - op->offset,
op->offset+op->len - end*dsk.csum_block_size,
0, (end+1)*dsk.csum_block_size - (op->offset+op->len)
);
}
}
dirty_db.emplace((obj_ver_id){
@ -377,12 +388,13 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{
// Small (journaled) write
// First check if the journal has sufficient space
uint64_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len);
blockstore_journal_check_t space_check(this);
if (unsynced_big_write_count &&
!space_check.check_available(op, unsynced_big_write_count,
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, 0)
sizeof(journal_entry_big_write) + dsk.dirty_dyn_size(0, dsk.data_block_size), 0)
|| !space_check.check_available(op, 1,
sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size,
sizeof(journal_entry_small_write) + dyn_size,
op->len + ((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
{
return 0;
@ -391,7 +403,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
BS_SUBMIT_CHECK_SQES(
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode
(immediate_commit != IMMEDIATE_NONE ||
!journal.entry_fits(sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size) ? 1 : 0) +
!journal.entry_fits(sizeof(journal_entry_small_write) + dyn_size) ? 1 : 0) +
(op->len > 0 ? 1 : 0)
);
write_iodepth++;
@ -399,7 +411,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
if (immediate_commit == IMMEDIATE_NONE)
{
if (!journal.entry_fits(sizeof(journal_entry_small_write) + dsk.clean_entry_bitmap_size))
if (!journal.entry_fits(sizeof(journal_entry_small_write) + dyn_size))
{
prepare_journal_sector_write(journal.cur_sector, op);
}
@ -409,7 +421,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
}
}
// Then pre-fill journal entry
uint64_t dyn_size = dsk.dirty_dyn_size(op->len);
journal_entry_small_write *je = (journal_entry_small_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_SMALL_WRITE_INSTANT : JE_SMALL_WRITE,
sizeof(journal_entry_small_write) + dyn_size
@ -516,20 +527,14 @@ resume_2:
.version = op->version,
});
assert(dirty_it != dirty_db.end());
uint64_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len);
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, 1,
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size,
if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size,
((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
{
return 0;
}
BS_SUBMIT_CHECK_SQES(1);
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
assert(dirty_it != dirty_db.end());
uint64_t dyn_size = dsk.dirty_dyn_size(op->len);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write) + dyn_size

View File

@ -205,7 +205,7 @@ int disk_tool_t::process_journal_block(void *buf, std::function<void(int, journa
uint32_t *block_csums = (uint32_t*)((uint8_t*)je + je->size - data_csum_size);
for (uint32_t pos = 0; pos < je->small_write.len; pos += je_start.csum_block_size, block_csums++)
{
if (crc32c(0, small_write_data + pos, je_start.csum_block_size) != *block_csums)
if (crc32c(0, (uint8_t*)small_write_data + pos, je_start.csum_block_size) != *block_csums)
{
data_csum_valid = false;
break;

View File

@ -27,7 +27,9 @@ fi
start_osd()
{
local i=$1
build/src/vitastor-osd --osd_num $i --bind_address 127.0.0.1 $NO_SAME $OSD_ARGS --etcd_address $ETCD_URL $(build/src/vitastor-disk simple-offsets --format options ./testdata/test_osd$i.bin $OFFSET_ARGS 2>/dev/null) >>./testdata/osd$i.log 2>&1 &
build/src/vitastor-osd --osd_num $i --bind_address 127.0.0.1 $NO_SAME $OSD_ARGS --etcd_address $ETCD_URL \
$(build/src/vitastor-disk simple-offsets --format options ./testdata/test_osd$i.bin $OFFSET_ARGS 2>/dev/null) \
>>./testdata/osd$i.log 2>&1 &
eval OSD${i}_PID=$!
}

View File

@ -12,6 +12,7 @@ PG_COUNT=32
. `dirname $0`/run_3osds.sh
check_qemu
# FIXME: Fix space rebalance priorities :)
IMG_SIZE=960
$ETCDCTL put /vitastor/config/inode/1/1 '{"name":"testimg","size":'$((IMG_SIZE*1024*1024))'}'
@ -24,6 +25,7 @@ kill_osds()
{
sleep 5
echo Killing OSD 1
kill -9 $OSD1_PID
$ETCDCTL del /vitastor/osd/state/1
@ -38,6 +40,7 @@ kill_osds()
done
sleep 5
echo Starting OSD 7
start_osd 7
sleep 5