|
|
|
@ -188,85 +188,13 @@ bool journal_flusher_co::loop()
|
|
|
|
|
#ifdef BLOCKSTORE_DEBUG
|
|
|
|
|
printf("Flushing %lu:%lu v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version);
|
|
|
|
|
#endif
|
|
|
|
|
dirty_it = dirty_end;
|
|
|
|
|
flusher->active_flushers++;
|
|
|
|
|
v.clear();
|
|
|
|
|
wait_count = 0;
|
|
|
|
|
copy_count = 0;
|
|
|
|
|
clean_loc = UINT64_MAX;
|
|
|
|
|
has_delete = false;
|
|
|
|
|
skip_copy = false;
|
|
|
|
|
while (1)
|
|
|
|
|
resume_1:
|
|
|
|
|
// Scan dirty versions of the object
|
|
|
|
|
if (!scan_dirty(1))
|
|
|
|
|
{
|
|
|
|
|
if (dirty_it->second.state == ST_J_STABLE && !skip_copy)
|
|
|
|
|
{
|
|
|
|
|
// First we submit all reads
|
|
|
|
|
offset = dirty_it->second.offset;
|
|
|
|
|
end_offset = dirty_it->second.offset + dirty_it->second.len;
|
|
|
|
|
it = v.begin();
|
|
|
|
|
while (1)
|
|
|
|
|
{
|
|
|
|
|
for (; it != v.end(); it++)
|
|
|
|
|
if (it->offset >= offset)
|
|
|
|
|
break;
|
|
|
|
|
if (it == v.end() || it->offset > offset && it->len > 0)
|
|
|
|
|
{
|
|
|
|
|
submit_offset = dirty_it->second.location + offset - dirty_it->second.offset;
|
|
|
|
|
submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset;
|
|
|
|
|
it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(MEM_ALIGNMENT, submit_len) });
|
|
|
|
|
copy_count++;
|
|
|
|
|
if (bs->journal.inmemory)
|
|
|
|
|
{
|
|
|
|
|
// Take it from memory
|
|
|
|
|
memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// Read it from disk
|
|
|
|
|
await_sqe(1);
|
|
|
|
|
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
|
|
|
|
|
data->callback = simple_callback_r;
|
|
|
|
|
my_uring_prep_readv(
|
|
|
|
|
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset
|
|
|
|
|
);
|
|
|
|
|
wait_count++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
offset = it->offset+it->len;
|
|
|
|
|
if (it == v.end() || offset >= end_offset)
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (dirty_it->second.state == ST_D_STABLE && !skip_copy)
|
|
|
|
|
{
|
|
|
|
|
// There is an unflushed big write. Copy small writes in its position
|
|
|
|
|
clean_loc = dirty_it->second.location;
|
|
|
|
|
skip_copy = true;
|
|
|
|
|
}
|
|
|
|
|
else if (dirty_it->second.state == ST_DEL_STABLE && !skip_copy)
|
|
|
|
|
{
|
|
|
|
|
// There is an unflushed delete
|
|
|
|
|
has_delete = true;
|
|
|
|
|
skip_copy = true;
|
|
|
|
|
}
|
|
|
|
|
else if (!IS_STABLE(dirty_it->second.state))
|
|
|
|
|
{
|
|
|
|
|
char err[1024];
|
|
|
|
|
snprintf(
|
|
|
|
|
err, 1024, "BUG: Unexpected dirty_entry %lu:%lu v%lu state during flush: %d",
|
|
|
|
|
dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
|
|
|
|
|
);
|
|
|
|
|
throw std::runtime_error(err);
|
|
|
|
|
}
|
|
|
|
|
if (dirty_it == bs->dirty_db.begin())
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
dirty_it--;
|
|
|
|
|
if (dirty_it->first.oid != cur.oid)
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
wait_state += 1;
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
if (copy_count == 0 && clean_loc == UINT64_MAX && !has_delete)
|
|
|
|
|
{
|
|
|
|
@ -283,16 +211,13 @@ bool journal_flusher_co::loop()
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
// Find it in clean_db
|
|
|
|
|
{
|
|
|
|
|
auto clean_it = bs->clean_db.find(cur.oid);
|
|
|
|
|
old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX);
|
|
|
|
|
old_clean_ver = (clean_it != bs->clean_db.end() ? clean_it->second.version : 0);
|
|
|
|
|
}
|
|
|
|
|
clean_it = bs->clean_db.find(cur.oid);
|
|
|
|
|
old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX);
|
|
|
|
|
if (clean_loc == UINT64_MAX)
|
|
|
|
|
{
|
|
|
|
|
if (copy_count > 0 && has_delete || old_clean_loc == UINT64_MAX)
|
|
|
|
|
{
|
|
|
|
|
// Object not present at all. This is a bug.
|
|
|
|
|
// Object not allocated. This is a bug.
|
|
|
|
|
char err[1024];
|
|
|
|
|
snprintf(
|
|
|
|
|
err, 1024, "BUG: Object %lu:%lu v%lu that we are trying to flush is not allocated on the data device",
|
|
|
|
@ -301,10 +226,10 @@ bool journal_flusher_co::loop()
|
|
|
|
|
throw std::runtime_error(err);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
clean_loc = old_clean_loc;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
has_delete = false;
|
|
|
|
|
// Also we need to submit metadata read(s). We do read-modify-write cycle(s) for every operation.
|
|
|
|
|
resume_2:
|
|
|
|
|
if (!modify_meta_read(clean_loc, meta_new, 2))
|
|
|
|
@ -339,9 +264,24 @@ bool journal_flusher_co::loop()
|
|
|
|
|
meta_old.it->second.state = 1;
|
|
|
|
|
bs->ringloop->wakeup();
|
|
|
|
|
}
|
|
|
|
|
// Reads completed, submit writes
|
|
|
|
|
// Reads completed, submit writes and set bitmap bits
|
|
|
|
|
if (bs->clean_entry_bitmap_size)
|
|
|
|
|
{
|
|
|
|
|
new_clean_bitmap = (bs->inmemory_meta
|
|
|
|
|
? meta_new.buf + meta_new.pos*bs->clean_entry_size + sizeof(clean_disk_entry)
|
|
|
|
|
: bs->clean_bitmap + (clean_loc >> bs->block_order)*bs->clean_entry_bitmap_size);
|
|
|
|
|
if (clean_init_bitmap)
|
|
|
|
|
{
|
|
|
|
|
memset(new_clean_bitmap, 0, bs->clean_entry_bitmap_size);
|
|
|
|
|
bitmap_set(new_clean_bitmap, clean_bitmap_offset, clean_bitmap_len);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for (it = v.begin(); it != v.end(); it++)
|
|
|
|
|
{
|
|
|
|
|
if (new_clean_bitmap)
|
|
|
|
|
{
|
|
|
|
|
bitmap_set(new_clean_bitmap, it->offset, it->len);
|
|
|
|
|
}
|
|
|
|
|
await_sqe(4);
|
|
|
|
|
data->iov = (struct iovec){ it->buf, (size_t)it->len };
|
|
|
|
|
data->callback = simple_callback_w;
|
|
|
|
@ -374,7 +314,7 @@ bool journal_flusher_co::loop()
|
|
|
|
|
wait_state = 5;
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
((clean_disk_entry*)meta_old.buf)[meta_old.pos] = { 0 };
|
|
|
|
|
memset(meta_old.buf + meta_old.pos*bs->clean_entry_size, 0, bs->clean_entry_size);
|
|
|
|
|
await_sqe(15);
|
|
|
|
|
data->iov = (struct iovec){ meta_old.buf, META_BLOCK_SIZE };
|
|
|
|
|
data->callback = simple_callback_w;
|
|
|
|
@ -383,12 +323,20 @@ bool journal_flusher_co::loop()
|
|
|
|
|
);
|
|
|
|
|
wait_count++;
|
|
|
|
|
}
|
|
|
|
|
((clean_disk_entry*)meta_new.buf)[meta_new.pos] = has_delete
|
|
|
|
|
? (clean_disk_entry){ 0 }
|
|
|
|
|
: (clean_disk_entry){
|
|
|
|
|
.oid = cur.oid,
|
|
|
|
|
.version = cur.version,
|
|
|
|
|
};
|
|
|
|
|
if (has_delete)
|
|
|
|
|
{
|
|
|
|
|
memset(meta_new.buf + meta_new.pos*bs->clean_entry_size, 0, bs->clean_entry_size);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
|
|
|
|
|
new_entry->oid = cur.oid;
|
|
|
|
|
new_entry->version = cur.version;
|
|
|
|
|
if (!bs->inmemory_meta)
|
|
|
|
|
{
|
|
|
|
|
memcpy(&new_entry->bitmap, new_clean_bitmap, bs->clean_entry_bitmap_size);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
await_sqe(6);
|
|
|
|
|
data->iov = (struct iovec){ meta_new.buf, META_BLOCK_SIZE };
|
|
|
|
|
data->callback = simple_callback_w;
|
|
|
|
@ -484,15 +432,109 @@ bool journal_flusher_co::loop()
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool journal_flusher_co::scan_dirty(int wait_base)
|
|
|
|
|
{
|
|
|
|
|
if (wait_state == wait_base)
|
|
|
|
|
{
|
|
|
|
|
goto resume_0;
|
|
|
|
|
}
|
|
|
|
|
dirty_it = dirty_end;
|
|
|
|
|
v.clear();
|
|
|
|
|
wait_count = 0;
|
|
|
|
|
copy_count = 0;
|
|
|
|
|
clean_loc = UINT64_MAX;
|
|
|
|
|
has_delete = false;
|
|
|
|
|
skip_copy = false;
|
|
|
|
|
clean_init_bitmap = false;
|
|
|
|
|
while (1)
|
|
|
|
|
{
|
|
|
|
|
if (dirty_it->second.state == ST_J_STABLE && !skip_copy)
|
|
|
|
|
{
|
|
|
|
|
// First we submit all reads
|
|
|
|
|
offset = dirty_it->second.offset;
|
|
|
|
|
end_offset = dirty_it->second.offset + dirty_it->second.len;
|
|
|
|
|
it = v.begin();
|
|
|
|
|
while (1)
|
|
|
|
|
{
|
|
|
|
|
for (; it != v.end(); it++)
|
|
|
|
|
if (it->offset >= offset)
|
|
|
|
|
break;
|
|
|
|
|
if (it == v.end() || it->offset > offset && it->len > 0)
|
|
|
|
|
{
|
|
|
|
|
submit_offset = dirty_it->second.location + offset - dirty_it->second.offset;
|
|
|
|
|
submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset;
|
|
|
|
|
it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(MEM_ALIGNMENT, submit_len) });
|
|
|
|
|
copy_count++;
|
|
|
|
|
if (bs->journal.inmemory)
|
|
|
|
|
{
|
|
|
|
|
// Take it from memory
|
|
|
|
|
memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// Read it from disk
|
|
|
|
|
await_sqe(0);
|
|
|
|
|
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
|
|
|
|
|
data->callback = simple_callback_r;
|
|
|
|
|
my_uring_prep_readv(
|
|
|
|
|
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset
|
|
|
|
|
);
|
|
|
|
|
wait_count++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
offset = it->offset+it->len;
|
|
|
|
|
if (it == v.end() || offset >= end_offset)
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else if (dirty_it->second.state == ST_D_STABLE && !skip_copy)
|
|
|
|
|
{
|
|
|
|
|
// There is an unflushed big write. Copy small writes in its position
|
|
|
|
|
clean_loc = dirty_it->second.location;
|
|
|
|
|
clean_init_bitmap = true;
|
|
|
|
|
clean_bitmap_offset = dirty_it->second.offset;
|
|
|
|
|
clean_bitmap_len = dirty_it->second.len;
|
|
|
|
|
skip_copy = true;
|
|
|
|
|
}
|
|
|
|
|
else if (dirty_it->second.state == ST_DEL_STABLE && !skip_copy)
|
|
|
|
|
{
|
|
|
|
|
// There is an unflushed delete
|
|
|
|
|
has_delete = true;
|
|
|
|
|
skip_copy = true;
|
|
|
|
|
}
|
|
|
|
|
else if (!IS_STABLE(dirty_it->second.state))
|
|
|
|
|
{
|
|
|
|
|
char err[1024];
|
|
|
|
|
snprintf(
|
|
|
|
|
err, 1024, "BUG: Unexpected dirty_entry %lu:%lu v%lu state during flush: %d",
|
|
|
|
|
dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
|
|
|
|
|
);
|
|
|
|
|
throw std::runtime_error(err);
|
|
|
|
|
}
|
|
|
|
|
if (dirty_it == bs->dirty_db.begin())
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
dirty_it--;
|
|
|
|
|
if (dirty_it->first.oid != cur.oid)
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base)
|
|
|
|
|
{
|
|
|
|
|
if (wait_state == wait_base)
|
|
|
|
|
{
|
|
|
|
|
goto resume_0;
|
|
|
|
|
}
|
|
|
|
|
// We must check if the same sector is already in memory if we don't keep all metadata in memory all the time.
|
|
|
|
|
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
|
|
|
|
|
// so I'll avoid it as long as I can.
|
|
|
|
|
wr.sector = ((meta_loc >> bs->block_order) / (META_BLOCK_SIZE / sizeof(clean_disk_entry))) * META_BLOCK_SIZE;
|
|
|
|
|
wr.pos = ((meta_loc >> bs->block_order) % (META_BLOCK_SIZE / sizeof(clean_disk_entry)));
|
|
|
|
|
wr.sector = ((meta_loc >> bs->block_order) / (META_BLOCK_SIZE / bs->clean_entry_size)) * META_BLOCK_SIZE;
|
|
|
|
|
wr.pos = ((meta_loc >> bs->block_order) % (META_BLOCK_SIZE / bs->clean_entry_size));
|
|
|
|
|
if (bs->inmemory_meta)
|
|
|
|
|
{
|
|
|
|
|
wr.buf = bs->metadata_buffer + wr.sector;
|
|
|
|
@ -643,3 +685,35 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void journal_flusher_co::bitmap_set(void *bitmap, uint64_t start, uint64_t len)
|
|
|
|
|
{
|
|
|
|
|
if (start == 0)
|
|
|
|
|
{
|
|
|
|
|
if (len == 32*BITMAP_GRANULARITY)
|
|
|
|
|
{
|
|
|
|
|
*((uint32_t*)bitmap) = 1;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
else if (len == 64*BITMAP_GRANULARITY)
|
|
|
|
|
{
|
|
|
|
|
*((uint64_t*)bitmap) = 1;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
unsigned bit_start = start / BITMAP_GRANULARITY;
|
|
|
|
|
unsigned bit_end = ((start + len) + BITMAP_GRANULARITY - 1) / BITMAP_GRANULARITY;
|
|
|
|
|
while (bit_start < bit_end)
|
|
|
|
|
{
|
|
|
|
|
if (!(bit_start & 7) && bit_end >= bit_start+8)
|
|
|
|
|
{
|
|
|
|
|
((uint8_t*)bitmap)[bit_start / 8] = 1;
|
|
|
|
|
bit_start += 8;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
((uint8_t*)bitmap)[bit_start / 8] |= 1 << (bit_start % 8);
|
|
|
|
|
bit_start++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|