Move flusher into a separate file

blocking-uring-test
Vitaliy Filippov 2019-11-13 17:45:37 +03:00
parent 75398414d1
commit 214da03735
6 changed files with 301 additions and 293 deletions

View File

@ -1,5 +1,5 @@
all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
blockstore_write.o blockstore_sync.o blockstore_stable.o crc32c.o ringloop.o test
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o test
clean:
rm -f *.o
crc32c.o: crc32c.c

View File

@ -35,10 +35,15 @@ blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config,
close(journal.fd);
throw e;
}
int flusher_count = stoull(config["flusher_count"]);
if (!flusher_count)
flusher_count = 32;
flusher = new journal_flusher_t(flusher_count, this);
}
blockstore::~blockstore()
{
delete flusher;
free(zero_object);
ringloop->unregister_consumer(ring_consumer.number);
if (data_fd >= 0)

View File

@ -245,6 +245,8 @@ private:
#include "blockstore_init.h"
#include "blockstore_flush.h"
class blockstore
{
struct ring_consumer_t ring_consumer;
@ -267,6 +269,7 @@ class blockstore
uint64_t data_offset, data_size, data_len;
struct journal_t journal;
journal_flusher_t *flusher;
ring_loop_t *ringloop;

240
blockstore_flush.cpp Normal file
View File

@ -0,0 +1,240 @@
#include "blockstore.h"
journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs)
{
this->bs = bs;
this->flusher_count = flusher_count;
this->active_flushers = 0;
co = new journal_flusher_co[flusher_count];
for (int i = 0; i < flusher_count; i++)
{
co[i].bs = bs;
co[i].wait_state = 0;
co[i].flusher = this;
}
}
journal_flusher_t::~journal_flusher_t()
{
delete[] co;
}
void journal_flusher_t::loop()
{
if (!active_flushers && !flush_queue.size())
{
return;
}
for (int i = 0; i < flusher_count; i++)
{
co[i].loop();
}
}
void journal_flusher_co::loop()
{
// This is much better than implementing the whole function as an FSM
// Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
if (wait_state == 1)
goto resume_1;
else if (wait_state == 2)
goto resume_2;
else if (wait_state == 3)
goto resume_3;
else if (wait_state == 4)
goto resume_4;
else if (wait_state == 5)
goto resume_5;
else if (wait_state == 6)
goto resume_6;
else if (wait_state == 7)
goto resume_7;
if (!flusher->flush_queue.size())
return;
cur = flusher->flush_queue.front();
flusher->flush_queue.pop_front();
dirty_it = bs->dirty_db.find(cur);
if (dirty_it != bs->dirty_db.end())
{
flusher->active_flushers++;
v.clear();
wait_count = 0;
clean_loc = UINT64_MAX;
skip_copy = false;
do
{
if (dirty_it->second.state == ST_J_STABLE)
{
// First we submit all reads
offset = dirty_it->second.offset;
len = dirty_it->second.size;
it = v.begin();
while (1)
{
for (; it != v.end(); it++)
if (it->offset >= offset)
break;
if (it == v.end() || it->offset > offset)
{
submit_len = it->offset >= offset+len ? len : it->offset-offset;
resume_1:
sqe = bs->get_sqe();
if (!sqe)
{
// Can't submit read, ring is full
wait_state = 1;
return;
}
v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len };
data->op = this;
io_uring_prep_readv(
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset
);
wait_count++;
}
if (it == v.end() || it->offset+it->len >= offset+len)
{
break;
}
}
// So subsequent stabilizers don't flush the entry again
dirty_it->second.state = ST_J_MOVE_READ_SUBMITTED;
}
else if (dirty_it->second.state == ST_D_STABLE)
{
// Copy last STABLE entry metadata
if (!skip_copy)
{
clean_loc = dirty_it->second.location;
}
skip_copy = true;
}
else if (IS_STABLE(dirty_it->second.state))
{
break;
}
dirty_it--;
} while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid);
if (clean_loc == UINT64_MAX)
{
// Find it in clean_db
auto clean_it = bs->clean_db.find(cur.oid);
if (clean_it == bs->clean_db.end())
{
// Object not present at all. This is a bug.
throw new std::runtime_error("BUG: Object we are trying to flush not allocated on the data device");
}
else
clean_loc = clean_it->second.location;
}
// Also we need to submit the metadata read. We do a read-modify-write for every operation.
// But we must check if the same sector is already in memory.
// Another option is to keep all raw metadata in memory all the time. Maybe I'll do it sometime...
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
// so I'll avoid it as long as I can.
meta_sector = (clean_loc / (512 / sizeof(clean_disk_entry))) * 512;
meta_pos = (clean_loc % (512 / sizeof(clean_disk_entry)));
meta_it = flusher->meta_sectors.find(meta_sector);
if (meta_it == flusher->meta_sectors.end())
{
// Not in memory yet, read it
meta_it = flusher->meta_sectors.emplace(meta_sector, (meta_sector_t){
.offset = meta_sector,
.len = 512,
.state = 0, // 0 = not read yet
.buf = memalign(512, 512),
.usage_count = 1,
}).first;
resume_2:
sqe = bs->get_sqe();
if (!sqe)
{
wait_state = 2;
return;
}
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ meta_it->second.buf, 512 };
data->op = this;
io_uring_prep_writev(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
);
wait_count++;
}
else
meta_it->second.usage_count++;
wait_state = 3;
resume_3:
// After reads complete we submit writes
if (wait_count == 0)
{
for (it = v.begin(); it != v.end(); it++)
{
resume_4:
sqe = bs->get_sqe();
if (!sqe)
{
// Can't submit a write, ring is full
wait_state = 4;
return;
}
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ it->buf, (size_t)it->len };
data->op = this;
io_uring_prep_writev(
sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
);
wait_count++;
}
// And a metadata write
resume_5:
if (meta_it->second.state == 0)
{
// metadata sector is still being read, wait for it
wait_state = 5;
return;
}
*((clean_disk_entry*)meta_it->second.buf + meta_pos) = {
.oid = cur.oid,
.version = cur.version,
.flags = DISK_ENTRY_STABLE,
};
resume_6:
sqe = bs->get_sqe();
if (!sqe)
{
// Can't submit a write, ring is full
wait_state = 6;
return;
}
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ meta_it->second.buf, 512 };
data->op = this;
io_uring_prep_writev(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
);
wait_count++;
wait_state = 7;
resume_7:
// Done, free all buffers
if (wait_count == 0)
{
meta_it->second.usage_count--;
if (meta_it->second.usage_count == 0)
{
free(meta_it->second.buf);
flusher->meta_sectors.erase(meta_it);
}
for (it = v.begin(); it != v.end(); it++)
{
free(it->buf);
}
v.clear();
wait_state = 0;
flusher->active_flushers--;
}
// FIXME Now sync everything
}
}
}

51
blockstore_flush.h Normal file
View File

@ -0,0 +1,51 @@
struct copy_buffer_t
{
uint64_t offset, len;
void *buf;
};
struct meta_sector_t
{
uint64_t offset, len;
int state;
void *buf;
int usage_count;
};
class journal_flusher_t;
// Journal flusher coroutine
class journal_flusher_co
{
blockstore *bs;
journal_flusher_t *flusher;
int wait_state, wait_count;
struct io_uring_sqe *sqe;
struct ring_data_t *data;
bool skip_copy;
obj_ver_id cur;
std::map<obj_ver_id, dirty_entry>::iterator dirty_it;
std::vector<copy_buffer_t> v;
std::vector<copy_buffer_t>::iterator it;
uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos;
std::map<uint64_t, meta_sector_t>::iterator meta_it;
friend class journal_flusher_t;
public:
void loop();
};
// Journal flusher itself
class journal_flusher_t
{
int flusher_count;
int active_flushers;
journal_flusher_co *co;
blockstore *bs;
friend class journal_flusher_co;
public:
std::map<uint64_t, meta_sector_t> meta_sectors;
std::deque<obj_ver_id> flush_queue;
journal_flusher_t(int flusher_count, blockstore *bs);
~journal_flusher_t();
void loop();
};

View File

@ -148,7 +148,7 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
}
dirty_it--;
} while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid);
flusher.flush_queue.push_back(*v);
flusher->flush_queue.push_back(*v);
}
}
// Acknowledge op
@ -156,294 +156,3 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
op->callback(op);
}
}
struct copy_buffer_t
{
uint64_t offset, len;
void *buf;
};
struct meta_sector_t
{
uint64_t offset, len;
int state;
void *buf;
int usage_count;
};
class journal_flusher_t;
// Journal flusher coroutine
class journal_flusher_co
{
blockstore *bs;
journal_flusher_t *flusher;
int wait_state, wait_count;
struct io_uring_sqe *sqe;
struct ring_data_t *data;
bool skip_copy;
obj_ver_id cur;
std::map<obj_ver_id, dirty_entry>::iterator dirty_it;
std::vector<copy_buffer_t> v;
std::vector<copy_buffer_t>::iterator it;
uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos;
std::map<uint64_t, meta_sector_t>::iterator meta_it;
friend class journal_flusher_t;
public:
void loop();
};
// Journal flusher itself
class journal_flusher_t
{
int flusher_count;
int active_flushers;
journal_flusher_co *co;
blockstore *bs;
friend class journal_flusher_co;
public:
std::map<uint64_t, meta_sector_t> meta_sectors;
std::deque<obj_ver_id> flush_queue;
journal_flusher_t(int flusher_count, blockstore *bs);
~journal_flusher_t();
void loop();
};
journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs)
{
this->bs = bs;
this->flusher_count = flusher_count;
this->active_flushers = 0;
co = new journal_flusher_co[flusher_count];
for (int i = 0; i < flusher_count; i++)
{
co[i].bs = bs;
co[i].wait_state = 0;
co[i].flusher = this;
}
}
journal_flusher_t::~journal_flusher_t()
{
delete[] co;
}
void journal_flusher_t::loop()
{
if (!active_flushers && !flush_queue.size())
{
return;
}
for (int i = 0; i < flusher_count; i++)
{
co[i].loop();
}
}
void journal_flusher_co::loop()
{
// This is much better than implementing the whole function as an FSM
// Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
if (wait_state == 1)
goto resume_1;
else if (wait_state == 2)
goto resume_2;
else if (wait_state == 3)
goto resume_3;
else if (wait_state == 4)
goto resume_4;
else if (wait_state == 5)
goto resume_5;
else if (wait_state == 6)
goto resume_6;
else if (wait_state == 7)
goto resume_7;
if (!flusher->flush_queue.size())
return;
cur = flusher->flush_queue.front();
flusher->flush_queue.pop_front();
dirty_it = bs->dirty_db.find(cur);
if (dirty_it != bs->dirty_db.end())
{
flusher->active_flushers++;
v.clear();
wait_count = 0;
clean_loc = UINT64_MAX;
skip_copy = false;
do
{
if (dirty_it->second.state == ST_J_STABLE)
{
// First we submit all reads
offset = dirty_it->second.offset;
len = dirty_it->second.size;
it = v.begin();
while (1)
{
for (; it != v.end(); it++)
if (it->offset >= offset)
break;
if (it == v.end() || it->offset > offset)
{
submit_len = it->offset >= offset+len ? len : it->offset-offset;
resume_1:
sqe = bs->get_sqe();
if (!sqe)
{
// Can't submit read, ring is full
wait_state = 1;
return;
}
v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len };
data->op = this;
io_uring_prep_readv(
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset
);
wait_count++;
}
if (it == v.end() || it->offset+it->len >= offset+len)
{
break;
}
}
// So subsequent stabilizers don't flush the entry again
dirty_it->second.state = ST_J_MOVE_READ_SUBMITTED;
}
else if (dirty_it->second.state == ST_D_STABLE)
{
// Copy last STABLE entry metadata
if (!skip_copy)
{
clean_loc = dirty_it->second.location;
}
skip_copy = true;
}
else if (IS_STABLE(dirty_it->second.state))
{
break;
}
dirty_it--;
} while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid);
if (clean_loc == UINT64_MAX)
{
// Find it in clean_db
auto clean_it = bs->clean_db.find(cur.oid);
if (clean_it == bs->clean_db.end())
{
// Object not present at all. This is a bug.
throw new std::runtime_error("BUG: Object we are trying to flush not allocated on the data device");
}
else
clean_loc = clean_it->second.location;
}
// Also we need to submit the metadata read. We do a read-modify-write for every operation.
// But we must check if the same sector is already in memory.
// Another option is to keep all raw metadata in memory all the time. Maybe I'll do it sometime...
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
// so I'll avoid it as long as I can.
meta_sector = (clean_loc / (512 / sizeof(clean_disk_entry))) * 512;
meta_pos = (clean_loc % (512 / sizeof(clean_disk_entry)));
meta_it = flusher->meta_sectors.find(meta_sector);
if (meta_it == flusher->meta_sectors.end())
{
// Not in memory yet, read it
meta_it = flusher->meta_sectors.emplace(meta_sector, (meta_sector_t){
.offset = meta_sector,
.len = 512,
.state = 0, // 0 = not read yet
.buf = memalign(512, 512),
.usage_count = 1,
}).first;
resume_2:
sqe = bs->get_sqe();
if (!sqe)
{
wait_state = 2;
return;
}
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ meta_it->second.buf, 512 };
data->op = this;
io_uring_prep_writev(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
);
wait_count++;
}
else
meta_it->second.usage_count++;
wait_state = 3;
resume_3:
// After reads complete we submit writes
if (wait_count == 0)
{
for (it = v.begin(); it != v.end(); it++)
{
resume_4:
sqe = bs->get_sqe();
if (!sqe)
{
// Can't submit a write, ring is full
wait_state = 4;
return;
}
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ it->buf, (size_t)it->len };
data->op = this;
io_uring_prep_writev(
sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
);
wait_count++;
}
// And a metadata write
resume_5:
if (meta_it->second.state == 0)
{
// metadata sector is still being read, wait for it
wait_state = 5;
return;
}
*((clean_disk_entry*)meta_it->second.buf + meta_pos) = {
.oid = cur.oid,
.version = cur.version,
.flags = DISK_ENTRY_STABLE,
};
resume_6:
sqe = bs->get_sqe();
if (!sqe)
{
// Can't submit a write, ring is full
wait_state = 6;
return;
}
data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){ meta_it->second.buf, 512 };
data->op = this;
io_uring_prep_writev(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
);
wait_count++;
wait_state = 7;
resume_7:
// Done, free all buffers
if (wait_count == 0)
{
meta_it->second.usage_count--;
if (meta_it->second.usage_count == 0)
{
free(meta_it->second.buf);
flusher->meta_sectors.erase(meta_it);
}
for (it = v.begin(); it != v.end(); it++)
{
free(it->buf);
}
v.clear();
wait_state = 0;
flusher->active_flushers--;
}
// FIXME Now sync everything
}
}
}