2019-11-13 17:45:37 +03:00
|
|
|
#include "blockstore.h"
|
|
|
|
|
|
|
|
journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs)
|
|
|
|
{
|
|
|
|
this->bs = bs;
|
|
|
|
this->flusher_count = flusher_count;
|
|
|
|
this->active_flushers = 0;
|
2019-11-14 02:29:34 +03:00
|
|
|
this->active_until_sync = 0;
|
|
|
|
this->sync_required = true;
|
|
|
|
this->sync_threshold = flusher_count == 1 ? 1 : flusher_count/2;
|
2019-11-13 17:45:37 +03:00
|
|
|
co = new journal_flusher_co[flusher_count];
|
|
|
|
for (int i = 0; i < flusher_count; i++)
|
|
|
|
{
|
|
|
|
co[i].bs = bs;
|
|
|
|
co[i].flusher = this;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-14 02:29:34 +03:00
|
|
|
journal_flusher_co::journal_flusher_co()
|
|
|
|
{
|
|
|
|
wait_state = 0;
|
|
|
|
simple_callback = [this](ring_data_t* data)
|
|
|
|
{
|
|
|
|
if (data->res < 0)
|
|
|
|
{
|
|
|
|
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
|
|
|
|
}
|
|
|
|
wait_count--;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2019-11-13 17:45:37 +03:00
|
|
|
journal_flusher_t::~journal_flusher_t()
|
|
|
|
{
|
|
|
|
delete[] co;
|
|
|
|
}
|
|
|
|
|
|
|
|
void journal_flusher_t::loop()
|
|
|
|
{
|
|
|
|
if (!active_flushers && !flush_queue.size())
|
|
|
|
{
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
for (int i = 0; i < flusher_count; i++)
|
|
|
|
{
|
|
|
|
co[i].loop();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-14 02:29:34 +03:00
|
|
|
#define await_sqe(label) \
|
|
|
|
resume_##label:\
|
|
|
|
sqe = bs->get_sqe();\
|
|
|
|
if (!sqe)\
|
|
|
|
{\
|
|
|
|
wait_state = label;\
|
|
|
|
return;\
|
|
|
|
}\
|
|
|
|
data = ((ring_data_t*)sqe->user_data);
|
|
|
|
|
2019-11-13 17:45:37 +03:00
|
|
|
void journal_flusher_co::loop()
|
|
|
|
{
|
|
|
|
// This is much better than implementing the whole function as an FSM
|
|
|
|
// Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
|
|
|
|
if (wait_state == 1)
|
|
|
|
goto resume_1;
|
|
|
|
else if (wait_state == 2)
|
|
|
|
goto resume_2;
|
|
|
|
else if (wait_state == 3)
|
|
|
|
goto resume_3;
|
|
|
|
else if (wait_state == 4)
|
|
|
|
goto resume_4;
|
|
|
|
else if (wait_state == 5)
|
|
|
|
goto resume_5;
|
|
|
|
else if (wait_state == 6)
|
|
|
|
goto resume_6;
|
|
|
|
else if (wait_state == 7)
|
|
|
|
goto resume_7;
|
2019-11-14 02:29:34 +03:00
|
|
|
else if (wait_state == 8)
|
|
|
|
goto resume_8;
|
|
|
|
else if (wait_state == 9)
|
|
|
|
goto resume_9;
|
|
|
|
else if (wait_state == 10)
|
|
|
|
goto resume_10;
|
|
|
|
else if (wait_state == 11)
|
|
|
|
goto resume_11;
|
|
|
|
resume_0:
|
2019-11-13 17:45:37 +03:00
|
|
|
if (!flusher->flush_queue.size())
|
|
|
|
return;
|
|
|
|
cur = flusher->flush_queue.front();
|
|
|
|
flusher->flush_queue.pop_front();
|
2019-11-15 02:03:57 +03:00
|
|
|
dirty_end = bs->dirty_db.find(cur);
|
|
|
|
if (dirty_end != bs->dirty_db.end())
|
2019-11-13 17:45:37 +03:00
|
|
|
{
|
2019-11-15 02:03:57 +03:00
|
|
|
dirty_it = dirty_end;
|
2019-11-13 17:45:37 +03:00
|
|
|
flusher->active_flushers++;
|
2019-11-14 02:29:34 +03:00
|
|
|
flusher->active_until_sync++;
|
2019-11-13 17:45:37 +03:00
|
|
|
v.clear();
|
|
|
|
wait_count = 0;
|
|
|
|
clean_loc = UINT64_MAX;
|
|
|
|
skip_copy = false;
|
|
|
|
do
|
|
|
|
{
|
|
|
|
if (dirty_it->second.state == ST_J_STABLE)
|
|
|
|
{
|
|
|
|
// First we submit all reads
|
|
|
|
offset = dirty_it->second.offset;
|
2019-11-14 01:13:07 +03:00
|
|
|
len = dirty_it->second.len;
|
2019-11-13 17:45:37 +03:00
|
|
|
it = v.begin();
|
|
|
|
while (1)
|
|
|
|
{
|
|
|
|
for (; it != v.end(); it++)
|
|
|
|
if (it->offset >= offset)
|
|
|
|
break;
|
|
|
|
if (it == v.end() || it->offset > offset)
|
|
|
|
{
|
|
|
|
submit_len = it->offset >= offset+len ? len : it->offset-offset;
|
2019-11-14 02:29:34 +03:00
|
|
|
await_sqe(1);
|
2019-11-13 17:45:37 +03:00
|
|
|
v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
|
2019-11-14 02:29:34 +03:00
|
|
|
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
|
|
|
|
data->callback = simple_callback;
|
2019-11-13 17:45:37 +03:00
|
|
|
io_uring_prep_readv(
|
|
|
|
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset
|
|
|
|
);
|
|
|
|
wait_count++;
|
|
|
|
}
|
|
|
|
if (it == v.end() || it->offset+it->len >= offset+len)
|
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// So subsequent stabilizers don't flush the entry again
|
|
|
|
dirty_it->second.state = ST_J_MOVE_READ_SUBMITTED;
|
|
|
|
}
|
|
|
|
else if (dirty_it->second.state == ST_D_STABLE)
|
|
|
|
{
|
2019-11-15 02:03:57 +03:00
|
|
|
// There is an unflushed big write. Overwrite it with small writes
|
2019-11-13 17:45:37 +03:00
|
|
|
if (!skip_copy)
|
|
|
|
{
|
|
|
|
clean_loc = dirty_it->second.location;
|
|
|
|
}
|
|
|
|
skip_copy = true;
|
|
|
|
}
|
|
|
|
else if (IS_STABLE(dirty_it->second.state))
|
|
|
|
{
|
2019-11-15 02:03:57 +03:00
|
|
|
// Other coroutine is already flushing it, stop
|
2019-11-13 17:45:37 +03:00
|
|
|
break;
|
|
|
|
}
|
2019-11-15 02:03:57 +03:00
|
|
|
else
|
|
|
|
{
|
|
|
|
throw new std::runtime_error("BUG: Unexpected dirty_entry state during flush: " + std::to_string(dirty_it->second.state));
|
|
|
|
}
|
|
|
|
dirty_start = dirty_it;
|
2019-11-13 17:45:37 +03:00
|
|
|
dirty_it--;
|
|
|
|
} while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid);
|
2019-11-15 02:03:57 +03:00
|
|
|
if (wait_count == 0 && clean_loc == UINT64_MAX)
|
|
|
|
{
|
|
|
|
// Nothing to flush
|
|
|
|
flusher->active_flushers--;
|
|
|
|
flusher->active_until_sync--;
|
|
|
|
wait_state = 0;
|
|
|
|
goto resume_0;
|
|
|
|
}
|
2019-11-13 17:45:37 +03:00
|
|
|
if (clean_loc == UINT64_MAX)
|
|
|
|
{
|
|
|
|
// Find it in clean_db
|
2019-11-15 02:03:57 +03:00
|
|
|
clean_it = bs->clean_db.find(cur.oid);
|
2019-11-13 17:45:37 +03:00
|
|
|
if (clean_it == bs->clean_db.end())
|
|
|
|
{
|
|
|
|
// Object not present at all. This is a bug.
|
|
|
|
throw new std::runtime_error("BUG: Object we are trying to flush not allocated on the data device");
|
|
|
|
}
|
|
|
|
else
|
|
|
|
clean_loc = clean_it->second.location;
|
|
|
|
}
|
|
|
|
// Also we need to submit the metadata read. We do a read-modify-write for every operation.
|
|
|
|
// But we must check if the same sector is already in memory.
|
|
|
|
// Another option is to keep all raw metadata in memory all the time. Maybe I'll do it sometime...
|
|
|
|
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
|
|
|
|
// so I'll avoid it as long as I can.
|
2019-11-15 02:03:57 +03:00
|
|
|
meta_sector = ((clean_loc >> bs->block_order) / (512 / sizeof(clean_disk_entry))) * 512;
|
|
|
|
meta_pos = ((clean_loc >> bs->block_order) % (512 / sizeof(clean_disk_entry)));
|
2019-11-13 17:45:37 +03:00
|
|
|
meta_it = flusher->meta_sectors.find(meta_sector);
|
|
|
|
if (meta_it == flusher->meta_sectors.end())
|
|
|
|
{
|
|
|
|
// Not in memory yet, read it
|
|
|
|
meta_it = flusher->meta_sectors.emplace(meta_sector, (meta_sector_t){
|
|
|
|
.offset = meta_sector,
|
|
|
|
.len = 512,
|
|
|
|
.state = 0, // 0 = not read yet
|
|
|
|
.buf = memalign(512, 512),
|
|
|
|
.usage_count = 1,
|
|
|
|
}).first;
|
2019-11-14 02:29:34 +03:00
|
|
|
await_sqe(2);
|
2019-11-13 17:45:37 +03:00
|
|
|
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
2019-11-13 21:17:04 +03:00
|
|
|
data->callback = [this](ring_data_t* data)
|
|
|
|
{
|
2019-11-14 02:29:34 +03:00
|
|
|
if (data->res < 0)
|
|
|
|
{
|
|
|
|
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
|
|
|
|
}
|
2019-11-14 01:13:07 +03:00
|
|
|
meta_it->second.state = 1;
|
2019-11-13 21:17:04 +03:00
|
|
|
wait_count--;
|
|
|
|
};
|
2019-11-15 02:03:57 +03:00
|
|
|
io_uring_prep_readv(
|
2019-11-13 17:45:37 +03:00
|
|
|
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
|
|
|
);
|
|
|
|
wait_count++;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
meta_it->second.usage_count++;
|
|
|
|
wait_state = 3;
|
|
|
|
resume_3:
|
2019-11-14 02:29:34 +03:00
|
|
|
if (wait_count > 0)
|
|
|
|
return;
|
|
|
|
// Reads completed, submit writes
|
|
|
|
for (it = v.begin(); it != v.end(); it++)
|
|
|
|
{
|
|
|
|
await_sqe(4);
|
|
|
|
data->iov = (struct iovec){ it->buf, (size_t)it->len };
|
|
|
|
data->callback = simple_callback;
|
|
|
|
io_uring_prep_writev(
|
|
|
|
sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
|
|
|
|
);
|
|
|
|
wait_count++;
|
|
|
|
}
|
|
|
|
// And a metadata write
|
|
|
|
resume_5:
|
|
|
|
if (meta_it->second.state == 0)
|
|
|
|
{
|
|
|
|
// metadata sector is still being read, wait for it
|
|
|
|
wait_state = 5;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
*((clean_disk_entry*)meta_it->second.buf + meta_pos) = {
|
|
|
|
.oid = cur.oid,
|
|
|
|
.version = cur.version,
|
|
|
|
};
|
|
|
|
// I consider unordered writes to data & metadata safe here, because
|
|
|
|
// "dirty" entries always override "clean" entries in our case
|
|
|
|
await_sqe(6);
|
|
|
|
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
|
|
|
data->callback = simple_callback;
|
|
|
|
io_uring_prep_writev(
|
|
|
|
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
|
|
|
);
|
|
|
|
wait_count++;
|
|
|
|
wait_state = 7;
|
|
|
|
resume_7:
|
|
|
|
if (wait_count > 0)
|
|
|
|
return;
|
|
|
|
// Done, free all buffers
|
|
|
|
meta_it->second.usage_count--;
|
|
|
|
if (meta_it->second.usage_count == 0)
|
|
|
|
{
|
|
|
|
free(meta_it->second.buf);
|
|
|
|
flusher->meta_sectors.erase(meta_it);
|
|
|
|
}
|
|
|
|
for (it = v.begin(); it != v.end(); it++)
|
|
|
|
{
|
|
|
|
free(it->buf);
|
|
|
|
}
|
|
|
|
v.clear();
|
|
|
|
flusher->active_until_sync--;
|
|
|
|
if (flusher->sync_required)
|
2019-11-13 17:45:37 +03:00
|
|
|
{
|
2019-11-14 02:29:34 +03:00
|
|
|
// And sync everything (in batches - not per each operation!)
|
|
|
|
cur_sync = flusher->syncs.end();
|
|
|
|
if (cur_sync == flusher->syncs.begin())
|
|
|
|
cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 });
|
|
|
|
else
|
|
|
|
cur_sync--;
|
|
|
|
cur_sync->ready_count++;
|
|
|
|
if (cur_sync->ready_count >= flusher->sync_threshold ||
|
|
|
|
!flusher->active_until_sync && !flusher->flush_queue.size())
|
2019-11-13 17:45:37 +03:00
|
|
|
{
|
2019-11-14 02:29:34 +03:00
|
|
|
// Sync batch is ready. Do it.
|
|
|
|
await_sqe(9);
|
|
|
|
data->callback = simple_callback;
|
|
|
|
io_uring_prep_fsync(sqe, bs->data_fd, 0);
|
|
|
|
wait_count++;
|
|
|
|
if (bs->meta_fd != bs->data_fd)
|
2019-11-13 17:45:37 +03:00
|
|
|
{
|
2019-11-14 02:29:34 +03:00
|
|
|
await_sqe(10);
|
|
|
|
data->callback = simple_callback;
|
|
|
|
io_uring_prep_fsync(sqe, bs->meta_fd, 0);
|
|
|
|
wait_count++;
|
2019-11-13 17:45:37 +03:00
|
|
|
}
|
2019-11-14 02:29:34 +03:00
|
|
|
wait_state = 11;
|
|
|
|
resume_11:
|
|
|
|
if (wait_count > 0)
|
|
|
|
return;
|
|
|
|
// Sync completed. All previous coroutines waiting for it must be resumed
|
|
|
|
cur_sync->state = 1;
|
2019-11-13 17:45:37 +03:00
|
|
|
}
|
2019-11-14 02:29:34 +03:00
|
|
|
// Wait until someone else sends and completes a sync.
|
|
|
|
resume_8:
|
|
|
|
if (!cur_sync->state)
|
2019-11-13 17:45:37 +03:00
|
|
|
{
|
2019-11-14 02:29:34 +03:00
|
|
|
wait_state = 8;
|
2019-11-13 17:45:37 +03:00
|
|
|
return;
|
|
|
|
}
|
2019-11-14 02:29:34 +03:00
|
|
|
cur_sync->ready_count--;
|
|
|
|
if (cur_sync->ready_count == 0)
|
2019-11-13 21:17:04 +03:00
|
|
|
{
|
2019-11-14 02:29:34 +03:00
|
|
|
flusher->syncs.erase(cur_sync);
|
2019-11-13 17:45:37 +03:00
|
|
|
}
|
|
|
|
}
|
2019-11-15 02:03:57 +03:00
|
|
|
// Update clean_db and dirty_db, free old data locations
|
|
|
|
if (clean_it != bs->clean_db.end() && clean_it->second.location != clean_loc)
|
|
|
|
{
|
|
|
|
allocator_set(bs->data_alloc, clean_it->second.location >> bs->block_order, false);
|
|
|
|
}
|
|
|
|
bs->clean_db[cur.oid] = {
|
|
|
|
.version = cur.version,
|
|
|
|
.location = clean_loc,
|
|
|
|
};
|
|
|
|
for (dirty_it = dirty_start; dirty_it != dirty_end; dirty_it++)
|
|
|
|
{
|
|
|
|
if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc)
|
|
|
|
{
|
|
|
|
allocator_set(bs->data_alloc, dirty_it->second.location >> bs->block_order, false);
|
|
|
|
}
|
|
|
|
int used = --bs->journal.used_sectors[dirty_it->second.journal_sector];
|
|
|
|
if (used == 1)
|
|
|
|
{
|
|
|
|
bs->journal.used_sectors.erase(dirty_it->second.journal_sector);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Then, basically, remove the whole version range from dirty_db...
|
|
|
|
// FIXME not until dirty_start, until other object. And wait for previous flushes.
|
|
|
|
bs->dirty_db.erase(dirty_start, std::next(dirty_end));
|
|
|
|
// FIXME: ...and clear unused part of the journal (with some interval, not for every flushed op)
|
2019-11-14 02:29:34 +03:00
|
|
|
wait_state = 0;
|
|
|
|
flusher->active_flushers--;
|
|
|
|
goto resume_0;
|
2019-11-13 17:45:37 +03:00
|
|
|
}
|
|
|
|
}
|