Extract 512 to constants

blocking-uring-test
Vitaliy Filippov 2020-01-06 14:11:47 +03:00
parent f3e3f8f005
commit bf3eecc159
10 changed files with 62 additions and 59 deletions

View File

@ -17,7 +17,6 @@
#define DEFAULT_ORDER 17
#define MIN_BLOCK_SIZE 4*1024
#define MAX_BLOCK_SIZE 128*1024*1024
#define DISK_ALIGNMENT 512
#define BS_OP_MIN 1
#define BS_OP_READ 1

View File

@ -8,7 +8,7 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
sync_threshold = flusher_count == 1 ? 1 : flusher_count/2;
journal_trim_interval = sync_threshold;
journal_trim_counter = 0;
journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(512, 512);
journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, JOURNAL_BLOCK_SIZE);
co = new journal_flusher_co[flusher_count];
for (int i = 0; i < flusher_count; i++)
{
@ -211,7 +211,7 @@ bool journal_flusher_co::loop()
{
submit_offset = dirty_it->second.location + offset - dirty_it->second.offset;
submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset;
it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(MEM_ALIGNMENT, submit_len) });
copy_count++;
if (bs->journal.inmemory)
{
@ -374,7 +374,7 @@ bool journal_flusher_co::loop()
}
((clean_disk_entry*)meta_old.buf)[meta_old.pos] = { 0 };
await_sqe(15);
data->iov = (struct iovec){ meta_old.buf, 512 };
data->iov = (struct iovec){ meta_old.buf, META_BLOCK_SIZE };
data->callback = simple_callback_w;
my_uring_prep_writev(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_old.sector
@ -388,7 +388,7 @@ bool journal_flusher_co::loop()
.version = cur.version,
};
await_sqe(6);
data->iov = (struct iovec){ meta_new.buf, 512 };
data->iov = (struct iovec){ meta_new.buf, META_BLOCK_SIZE };
data->callback = simple_callback_w;
my_uring_prep_writev(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_new.sector
@ -452,7 +452,7 @@ bool journal_flusher_co::loop()
.journal_start = bs->journal.used_start,
};
((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock);
data->iov = (struct iovec){ flusher->journal_superblock, 512 };
data->iov = (struct iovec){ flusher->journal_superblock, JOURNAL_BLOCK_SIZE };
data->callback = simple_callback_w;
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
wait_count++;
@ -489,8 +489,8 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_
// We must check if the same sector is already in memory if we don't keep all metadata in memory all the time.
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
// so I'll avoid it as long as I can.
wr.sector = ((meta_loc >> bs->block_order) / (512 / sizeof(clean_disk_entry))) * 512;
wr.pos = ((meta_loc >> bs->block_order) % (512 / sizeof(clean_disk_entry)));
wr.sector = ((meta_loc >> bs->block_order) / (META_BLOCK_SIZE / sizeof(clean_disk_entry))) * META_BLOCK_SIZE;
wr.pos = ((meta_loc >> bs->block_order) % (META_BLOCK_SIZE / sizeof(clean_disk_entry)));
if (bs->inmemory_meta)
{
wr.buf = bs->metadata_buffer + wr.sector;
@ -500,16 +500,16 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_
if (wr.it == flusher->meta_sectors.end())
{
// Not in memory yet, read it
wr.buf = memalign(512, 512);
wr.buf = memalign(MEM_ALIGNMENT, META_BLOCK_SIZE);
wr.it = flusher->meta_sectors.emplace(wr.sector, (meta_sector_t){
.offset = wr.sector,
.len = 512,
.len = META_BLOCK_SIZE,
.state = 0, // 0 = not read yet
.buf = wr.buf,
.usage_count = 1,
}).first;
await_sqe(0);
data->iov = (struct iovec){ wr.it->second.buf, 512 };
data->iov = (struct iovec){ wr.it->second.buf, META_BLOCK_SIZE };
data->callback = simple_callback_r;
wr.submitted = true;
my_uring_prep_readv(

View File

@ -17,7 +17,7 @@ blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *
{
throw std::runtime_error("Bad block size");
}
zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size);
zero_object = (uint8_t*)memalign(MEM_ALIGNMENT, block_size);
data_fd = meta_fd = journal.fd = -1;
try
{
@ -342,7 +342,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
// Allocate memory
op->version = stable_count;
op->retval = total_count;
op->buf = memalign(512, sizeof(obj_ver_id) * total_count);
op->buf = memalign(MEM_ALIGNMENT, sizeof(obj_ver_id) * total_count);
if (!op->buf)
{
op->retval = -ENOMEM;

View File

@ -23,6 +23,11 @@
//#define BLOCKSTORE_DEBUG
#define DISK_ALIGNMENT 512
#define MEM_ALIGNMENT 512
#define JOURNAL_BLOCK_SIZE 512
#define META_BLOCK_SIZE 512
// States are not stored on disk. Instead, they're deduced from the journal
#define ST_J_IN_FLIGHT 1

View File

@ -29,7 +29,7 @@ int blockstore_init_meta::loop()
if (bs->inmemory_meta)
metadata_buffer = bs->metadata_buffer;
else
metadata_buffer = memalign(512, 2*bs->metadata_buf_size);
metadata_buffer = memalign(MEM_ALIGNMENT, 2*bs->metadata_buf_size);
if (!metadata_buffer)
throw std::runtime_error("Failed to allocate metadata read buffer");
while (1)
@ -65,8 +65,8 @@ int blockstore_init_meta::loop()
void *done_buf = bs->inmemory_meta
? (metadata_buffer + done_pos)
: (metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0));
unsigned count = 512 / sizeof(clean_disk_entry);
for (int sector = 0; sector < done_len; sector += 512)
unsigned count = META_BLOCK_SIZE / sizeof(clean_disk_entry);
for (int sector = 0; sector < done_len; sector += META_BLOCK_SIZE)
{
clean_disk_entry *entries = (clean_disk_entry*)(done_buf + sector);
// handle <count> entries
@ -167,7 +167,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data1)
if (journal_pos >= bs->journal.len)
{
// Continue from the beginning
journal_pos = 512;
journal_pos = JOURNAL_BLOCK_SIZE;
wrapped = true;
}
submitted_buf = NULL;
@ -194,7 +194,7 @@ int blockstore_init_journal::loop()
printf("Reading blockstore journal\n");
if (!bs->journal.inmemory)
{
submitted_buf = memalign(512, 1024);
submitted_buf = memalign(MEM_ALIGNMENT, 2*JOURNAL_BLOCK_SIZE);
if (!submitted_buf)
throw std::bad_alloc();
}
@ -205,7 +205,7 @@ int blockstore_init_journal::loop()
if (!sqe)
throw std::runtime_error("io_uring is full while trying to read journal");
data = ((ring_data_t*)sqe->user_data);
data->iov = { submitted_buf, 512 };
data->iov = { submitted_buf, JOURNAL_BLOCK_SIZE };
data->callback = simple_callback;
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
bs->ringloop->submit();
@ -219,18 +219,18 @@ resume_1:
if (iszero((uint64_t*)submitted_buf, 3))
{
// Journal is empty
// FIXME handle this wrapping to 512 better
bs->journal.used_start = 512;
bs->journal.next_free = 512;
// FIXME handle this wrapping to JOURNAL_BLOCK_SIZE better (maybe)
bs->journal.used_start = JOURNAL_BLOCK_SIZE;
bs->journal.next_free = JOURNAL_BLOCK_SIZE;
// Initialize journal "superblock" and the first block
memset(submitted_buf, 0, 1024);
memset(submitted_buf, 0, 2*JOURNAL_BLOCK_SIZE);
*((journal_entry_start*)submitted_buf) = {
.crc32 = 0,
.magic = JOURNAL_MAGIC,
.type = JE_START,
.size = sizeof(journal_entry_start),
.reserved = 0,
.journal_start = 512,
.journal_start = JOURNAL_BLOCK_SIZE,
};
((journal_entry_start*)submitted_buf)->crc32 = je_crc32((journal_entry*)submitted_buf);
if (bs->readonly)
@ -242,7 +242,7 @@ resume_1:
// Cool effect. Same operations result in journal replay.
// FIXME: Randomize initial crc32. Track crc32 when trimming.
GET_SQE();
data->iov = (struct iovec){ submitted_buf, 1024 };
data->iov = (struct iovec){ submitted_buf, 2*JOURNAL_BLOCK_SIZE };
data->callback = simple_callback;
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
wait_count++;
@ -301,7 +301,7 @@ resume_1:
if (journal_pos < bs->journal.used_start)
end = bs->journal.used_start;
if (!bs->journal.inmemory)
submitted_buf = memalign(512, JOURNAL_BUFFER_SIZE);
submitted_buf = memalign(MEM_ALIGNMENT, JOURNAL_BUFFER_SIZE);
else
submitted_buf = bs->journal.buffer + journal_pos;
data->iov = {
@ -322,7 +322,7 @@ resume_1:
if (init_write_buf && !bs->readonly)
{
GET_SQE();
data->iov = { init_write_buf, 512 };
data->iov = { init_write_buf, JOURNAL_BLOCK_SIZE };
data->callback = simple_callback;
wait_count++;
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + init_write_sector);
@ -389,8 +389,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
uint64_t proc_pos, pos;
if (continue_pos != 0)
{
proc_pos = (continue_pos / 512) * 512;
pos = continue_pos % 512;
proc_pos = (continue_pos / JOURNAL_BLOCK_SIZE) * JOURNAL_BLOCK_SIZE;
pos = continue_pos % JOURNAL_BLOCK_SIZE;
continue_pos = 0;
goto resume;
}
@ -398,13 +398,13 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
{
proc_pos = next_free;
pos = 0;
next_free += 512;
next_free += JOURNAL_BLOCK_SIZE;
if (next_free >= bs->journal.len)
{
next_free = 512;
next_free = JOURNAL_BLOCK_SIZE;
}
resume:
while (pos < 512)
while (pos < JOURNAL_BLOCK_SIZE)
{
journal_entry *je = (journal_entry*)(buf + proc_pos - done_pos + pos);
if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
@ -432,13 +432,13 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
if (next_free + je->small_write.len > bs->journal.len)
{
// data continues from the beginning of the journal
next_free = 512;
next_free = JOURNAL_BLOCK_SIZE;
}
uint64_t location = next_free;
next_free += je->small_write.len;
if (next_free >= bs->journal.len)
{
next_free = 512;
next_free = JOURNAL_BLOCK_SIZE;
}
if (location != je->small_write.data_offset)
{
@ -479,7 +479,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
{
// journal entry is corrupt, stop here
// interesting thing is that we must clear the corrupt entry if we're not readonly
memset(buf + proc_pos - done_pos + pos, 0, 512 - pos);
memset(buf + proc_pos - done_pos + pos, 0, JOURNAL_BLOCK_SIZE - pos);
bs->journal.next_free = prev_free;
init_write_buf = buf + proc_pos - done_pos;
init_write_sector = proc_pos;

View File

@ -31,8 +31,7 @@ class blockstore_init_journal
uint64_t entries_loaded = 0;
uint32_t crc32_last = 0;
bool started = false;
// FIXME: use DISK_ALIGNMENT everywhere
uint64_t next_free = 512;
uint64_t next_free = JOURNAL_BLOCK_SIZE;
std::vector<bs_init_journal_done> done;
uint64_t journal_pos = 0;
uint64_t continue_pos = 0;

View File

@ -15,7 +15,7 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
{
while (1)
{
int fits = (512 - next_in_pos) / size;
int fits = (JOURNAL_BLOCK_SIZE - next_in_pos) / size;
if (fits > 0)
{
required -= fits;
@ -26,10 +26,10 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
{
break;
}
next_pos = next_pos+512;
next_pos = next_pos + JOURNAL_BLOCK_SIZE;
if (next_pos >= bs->journal.len)
{
next_pos = 512;
next_pos = JOURNAL_BLOCK_SIZE;
right_dir = false;
}
next_in_pos = 0;
@ -49,11 +49,11 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
next_pos = next_pos + data_after;
if (next_pos > bs->journal.len)
{
next_pos = 512 + data_after;
next_pos = JOURNAL_BLOCK_SIZE + data_after;
right_dir = false;
}
}
if (!right_dir && next_pos >= bs->journal.used_start-512)
if (!right_dir && next_pos >= bs->journal.used_start-JOURNAL_BLOCK_SIZE)
{
// No space in the journal. Wait until used_start changes.
PRIV(op)->wait_for = WAIT_JOURNAL;
@ -66,7 +66,7 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size)
{
if (512 - journal.in_sector_pos < size)
if (JOURNAL_BLOCK_SIZE - journal.in_sector_pos < size)
{
// Move to the next journal sector
if (journal.sector_info[journal.cur_sector].usage_count > 0)
@ -76,15 +76,15 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
}
journal.sector_info[journal.cur_sector].offset = journal.next_free;
journal.in_sector_pos = 0;
journal.next_free = (journal.next_free+512) < journal.len ? journal.next_free + 512 : 512;
journal.next_free = (journal.next_free+JOURNAL_BLOCK_SIZE) < journal.len ? journal.next_free + JOURNAL_BLOCK_SIZE : JOURNAL_BLOCK_SIZE;
memset(journal.inmemory
? journal.buffer + journal.sector_info[journal.cur_sector].offset
: journal.sector_buf + 512*journal.cur_sector, 0, 512);
: journal.sector_buf + JOURNAL_BLOCK_SIZE*journal.cur_sector, 0, JOURNAL_BLOCK_SIZE);
}
journal_entry *je = (struct journal_entry*)(
(journal.inmemory
? journal.buffer + journal.sector_info[journal.cur_sector].offset
: journal.sector_buf + 512*journal.cur_sector) + journal.in_sector_pos
: journal.sector_buf + JOURNAL_BLOCK_SIZE*journal.cur_sector) + journal.in_sector_pos
);
journal.in_sector_pos += size;
je->magic = JOURNAL_MAGIC;
@ -101,8 +101,8 @@ void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::f
data->iov = (struct iovec){
(journal.inmemory
? journal.buffer + journal.sector_info[journal.cur_sector].offset
: journal.sector_buf + 512*journal.cur_sector),
512
: journal.sector_buf + JOURNAL_BLOCK_SIZE*journal.cur_sector),
JOURNAL_BLOCK_SIZE
};
data->callback = cb;
my_uring_prep_writev(

View File

@ -117,8 +117,8 @@ struct journal_t
void *buffer = NULL;
uint64_t offset, len;
uint64_t next_free = 512;
uint64_t used_start = 512;
uint64_t next_free = JOURNAL_BLOCK_SIZE;
uint64_t used_start = JOURNAL_BLOCK_SIZE;
uint32_t crc32_last = 0;
// Current sector(s) used for writing
@ -126,7 +126,7 @@ struct journal_t
journal_sector_info_t *sector_info = NULL;
uint64_t sector_count;
int cur_sector = 0;
int in_sector_pos = 512; // no free space because sector is initially unmapped
int in_sector_pos = JOURNAL_BLOCK_SIZE; // no free space because sector is initially unmapped
// Used sector map
// May use ~ 80 MB per 1 GB of used journal space in the worst case

View File

@ -45,7 +45,7 @@ void blockstore_impl_t::calc_lengths(blockstore_config_t & config)
}
// required metadata size
block_count = data_len / block_size;
meta_len = ((block_count - 1 + 512 / sizeof(clean_disk_entry)) / (512 / sizeof(clean_disk_entry))) * 512;
meta_len = ((block_count - 1 + META_BLOCK_SIZE / sizeof(clean_disk_entry)) / (META_BLOCK_SIZE / sizeof(clean_disk_entry))) * META_BLOCK_SIZE;
if (meta_area < meta_len)
{
throw std::runtime_error("Metadata area is too small, need at least "+std::to_string(meta_len)+" bytes");
@ -58,7 +58,7 @@ void blockstore_impl_t::calc_lengths(blockstore_config_t & config)
inmemory_meta = config["inmemory_metadata"] != "false";
if (inmemory_meta)
{
metadata_buffer = memalign(512, meta_len);
metadata_buffer = memalign(MEM_ALIGNMENT, meta_len);
if (!metadata_buffer)
throw std::runtime_error("Failed to allocate memory for metadata");
}
@ -78,7 +78,7 @@ void blockstore_impl_t::calc_lengths(blockstore_config_t & config)
}
if (journal.inmemory)
{
journal.buffer = memalign(512, journal.len);
journal.buffer = memalign(MEM_ALIGNMENT, journal.len);
if (!journal.buffer)
throw std::runtime_error("Failed to allocate memory for journal");
}
@ -200,7 +200,7 @@ void blockstore_impl_t::open_journal(blockstore_config_t & config)
if (config["inmemory_journal"] == "false")
{
journal.inmemory = false;
journal.sector_buf = (uint8_t*)memalign(512, journal.sector_count * 512);
journal.sector_buf = (uint8_t*)memalign(MEM_ALIGNMENT, journal.sector_count * JOURNAL_BLOCK_SIZE);
if (!journal.sector_buf)
throw std::bad_alloc();
}

View File

@ -136,9 +136,9 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
// Small (journaled) write
// First check if the journal has sufficient space
// FIXME Always two SQEs for now. Although it's possible to send 1 sometimes
//two_sqes = (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write)
//two_sqes = (JOURNAL_BLOCK_SIZE - journal.in_sector_pos < sizeof(struct journal_entry_small_write)
// ? (journal.len - next_pos < op->len)
// : (journal.sector_info[journal.cur_sector].offset + 512 != journal.next_free ||
// : (journal.sector_info[journal.cur_sector].offset + JOURNAL_BLOCK_SIZE != journal.next_free ||
// journal.len - next_pos < op->len);
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len))
@ -163,7 +163,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
printf("journal offset %lu is used by %lu:%lu v%lu\n", dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version);
#endif
// Figure out where data will be
journal.next_free = (journal.next_free + op->len) <= journal.len ? journal.next_free : 512;
journal.next_free = (journal.next_free + op->len) <= journal.len ? journal.next_free : JOURNAL_BLOCK_SIZE;
je->oid = op->oid;
je->version = op->version;
je->offset = op->offset;
@ -199,7 +199,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
dirty_it->second.state = ST_J_SUBMITTED;
journal.next_free += op->len;
if (journal.next_free >= journal.len)
journal.next_free = 512;
journal.next_free = JOURNAL_BLOCK_SIZE;
// Remember small write as unsynced
unsynced_small_writes.push_back((obj_ver_id){
.oid = op->oid,