forked from vitalif/vitastor
Sync metadata & data after copying from journal
parent
bc549553e4
commit
f1e236c6e8
|
@ -5,15 +5,30 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs)
|
|||
this->bs = bs;
|
||||
this->flusher_count = flusher_count;
|
||||
this->active_flushers = 0;
|
||||
this->active_until_sync = 0;
|
||||
this->sync_required = true;
|
||||
this->sync_threshold = flusher_count == 1 ? 1 : flusher_count/2;
|
||||
co = new journal_flusher_co[flusher_count];
|
||||
for (int i = 0; i < flusher_count; i++)
|
||||
{
|
||||
co[i].bs = bs;
|
||||
co[i].wait_state = 0;
|
||||
co[i].flusher = this;
|
||||
}
|
||||
}
|
||||
|
||||
journal_flusher_co::journal_flusher_co()
|
||||
{
|
||||
wait_state = 0;
|
||||
simple_callback = [this](ring_data_t* data)
|
||||
{
|
||||
if (data->res < 0)
|
||||
{
|
||||
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
|
||||
}
|
||||
wait_count--;
|
||||
};
|
||||
}
|
||||
|
||||
journal_flusher_t::~journal_flusher_t()
|
||||
{
|
||||
delete[] co;
|
||||
|
@ -31,6 +46,16 @@ void journal_flusher_t::loop()
|
|||
}
|
||||
}
|
||||
|
||||
#define await_sqe(label) \
|
||||
resume_##label:\
|
||||
sqe = bs->get_sqe();\
|
||||
if (!sqe)\
|
||||
{\
|
||||
wait_state = label;\
|
||||
return;\
|
||||
}\
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
|
||||
void journal_flusher_co::loop()
|
||||
{
|
||||
// This is much better than implementing the whole function as an FSM
|
||||
|
@ -49,6 +74,15 @@ void journal_flusher_co::loop()
|
|||
goto resume_6;
|
||||
else if (wait_state == 7)
|
||||
goto resume_7;
|
||||
else if (wait_state == 8)
|
||||
goto resume_8;
|
||||
else if (wait_state == 9)
|
||||
goto resume_9;
|
||||
else if (wait_state == 10)
|
||||
goto resume_10;
|
||||
else if (wait_state == 11)
|
||||
goto resume_11;
|
||||
resume_0:
|
||||
if (!flusher->flush_queue.size())
|
||||
return;
|
||||
cur = flusher->flush_queue.front();
|
||||
|
@ -57,6 +91,7 @@ void journal_flusher_co::loop()
|
|||
if (dirty_it != bs->dirty_db.end())
|
||||
{
|
||||
flusher->active_flushers++;
|
||||
flusher->active_until_sync++;
|
||||
v.clear();
|
||||
wait_count = 0;
|
||||
clean_loc = UINT64_MAX;
|
||||
|
@ -77,21 +112,10 @@ void journal_flusher_co::loop()
|
|||
if (it == v.end() || it->offset > offset)
|
||||
{
|
||||
submit_len = it->offset >= offset+len ? len : it->offset-offset;
|
||||
resume_1:
|
||||
sqe = bs->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
// Can't submit read, ring is full
|
||||
wait_state = 1;
|
||||
return;
|
||||
}
|
||||
await_sqe(1);
|
||||
v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = (struct iovec){ v.end()->buf, (size_t)submit_len };
|
||||
data->callback = [this](ring_data_t* data)
|
||||
{
|
||||
wait_count--;
|
||||
};
|
||||
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
|
||||
data->callback = simple_callback;
|
||||
io_uring_prep_readv(
|
||||
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset
|
||||
);
|
||||
|
@ -150,17 +174,14 @@ void journal_flusher_co::loop()
|
|||
.buf = memalign(512, 512),
|
||||
.usage_count = 1,
|
||||
}).first;
|
||||
resume_2:
|
||||
sqe = bs->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
wait_state = 2;
|
||||
return;
|
||||
}
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
await_sqe(2);
|
||||
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
||||
data->callback = [this](ring_data_t* data)
|
||||
{
|
||||
if (data->res < 0)
|
||||
{
|
||||
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
|
||||
}
|
||||
meta_it->second.state = 1;
|
||||
wait_count--;
|
||||
};
|
||||
|
@ -173,80 +194,103 @@ void journal_flusher_co::loop()
|
|||
meta_it->second.usage_count++;
|
||||
wait_state = 3;
|
||||
resume_3:
|
||||
// After reads complete we submit writes
|
||||
if (wait_count == 0)
|
||||
if (wait_count > 0)
|
||||
return;
|
||||
// Reads completed, submit writes
|
||||
for (it = v.begin(); it != v.end(); it++)
|
||||
{
|
||||
for (it = v.begin(); it != v.end(); it++)
|
||||
{
|
||||
resume_4:
|
||||
sqe = bs->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
// Can't submit a write, ring is full
|
||||
wait_state = 4;
|
||||
return;
|
||||
}
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = (struct iovec){ it->buf, (size_t)it->len };
|
||||
data->callback = [this](ring_data_t* data)
|
||||
{
|
||||
wait_count--;
|
||||
};
|
||||
io_uring_prep_writev(
|
||||
sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
|
||||
);
|
||||
wait_count++;
|
||||
}
|
||||
// And a metadata write
|
||||
resume_5:
|
||||
if (meta_it->second.state == 0)
|
||||
{
|
||||
// metadata sector is still being read, wait for it
|
||||
wait_state = 5;
|
||||
return;
|
||||
}
|
||||
*((clean_disk_entry*)meta_it->second.buf + meta_pos) = {
|
||||
.oid = cur.oid,
|
||||
.version = cur.version,
|
||||
};
|
||||
resume_6:
|
||||
sqe = bs->get_sqe();
|
||||
if (!sqe)
|
||||
{
|
||||
// Can't submit a write, ring is full
|
||||
wait_state = 6;
|
||||
return;
|
||||
}
|
||||
data = ((ring_data_t*)sqe->user_data);
|
||||
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
||||
data->callback = [this](ring_data_t* data)
|
||||
{
|
||||
wait_count--;
|
||||
};
|
||||
await_sqe(4);
|
||||
data->iov = (struct iovec){ it->buf, (size_t)it->len };
|
||||
data->callback = simple_callback;
|
||||
io_uring_prep_writev(
|
||||
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
||||
sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
|
||||
);
|
||||
wait_count++;
|
||||
wait_state = 7;
|
||||
resume_7:
|
||||
// Done, free all buffers
|
||||
if (wait_count == 0)
|
||||
{
|
||||
meta_it->second.usage_count--;
|
||||
if (meta_it->second.usage_count == 0)
|
||||
{
|
||||
free(meta_it->second.buf);
|
||||
flusher->meta_sectors.erase(meta_it);
|
||||
}
|
||||
for (it = v.begin(); it != v.end(); it++)
|
||||
{
|
||||
free(it->buf);
|
||||
}
|
||||
v.clear();
|
||||
wait_state = 0;
|
||||
flusher->active_flushers--;
|
||||
}
|
||||
// FIXME Now sync everything
|
||||
}
|
||||
// And a metadata write
|
||||
resume_5:
|
||||
if (meta_it->second.state == 0)
|
||||
{
|
||||
// metadata sector is still being read, wait for it
|
||||
wait_state = 5;
|
||||
return;
|
||||
}
|
||||
*((clean_disk_entry*)meta_it->second.buf + meta_pos) = {
|
||||
.oid = cur.oid,
|
||||
.version = cur.version,
|
||||
};
|
||||
// I consider unordered writes to data & metadata safe here, because
|
||||
// "dirty" entries always override "clean" entries in our case
|
||||
await_sqe(6);
|
||||
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
||||
data->callback = simple_callback;
|
||||
io_uring_prep_writev(
|
||||
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
||||
);
|
||||
wait_count++;
|
||||
wait_state = 7;
|
||||
resume_7:
|
||||
if (wait_count > 0)
|
||||
return;
|
||||
// Done, free all buffers
|
||||
meta_it->second.usage_count--;
|
||||
if (meta_it->second.usage_count == 0)
|
||||
{
|
||||
free(meta_it->second.buf);
|
||||
flusher->meta_sectors.erase(meta_it);
|
||||
}
|
||||
for (it = v.begin(); it != v.end(); it++)
|
||||
{
|
||||
free(it->buf);
|
||||
}
|
||||
v.clear();
|
||||
flusher->active_until_sync--;
|
||||
if (flusher->sync_required)
|
||||
{
|
||||
// And sync everything (in batches - not per each operation!)
|
||||
cur_sync = flusher->syncs.end();
|
||||
if (cur_sync == flusher->syncs.begin())
|
||||
cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 });
|
||||
else
|
||||
cur_sync--;
|
||||
cur_sync->ready_count++;
|
||||
if (cur_sync->ready_count >= flusher->sync_threshold ||
|
||||
!flusher->active_until_sync && !flusher->flush_queue.size())
|
||||
{
|
||||
// Sync batch is ready. Do it.
|
||||
await_sqe(9);
|
||||
data->callback = simple_callback;
|
||||
io_uring_prep_fsync(sqe, bs->data_fd, 0);
|
||||
wait_count++;
|
||||
if (bs->meta_fd != bs->data_fd)
|
||||
{
|
||||
await_sqe(10);
|
||||
data->callback = simple_callback;
|
||||
io_uring_prep_fsync(sqe, bs->meta_fd, 0);
|
||||
wait_count++;
|
||||
}
|
||||
wait_state = 11;
|
||||
resume_11:
|
||||
if (wait_count > 0)
|
||||
return;
|
||||
// Sync completed. All previous coroutines waiting for it must be resumed
|
||||
cur_sync->state = 1;
|
||||
}
|
||||
// Wait until someone else sends and completes a sync.
|
||||
resume_8:
|
||||
if (!cur_sync->state)
|
||||
{
|
||||
wait_state = 8;
|
||||
return;
|
||||
}
|
||||
cur_sync->ready_count--;
|
||||
if (cur_sync->ready_count == 0)
|
||||
{
|
||||
flusher->syncs.erase(cur_sync);
|
||||
}
|
||||
}
|
||||
wait_state = 0;
|
||||
flusher->active_flushers--;
|
||||
goto resume_0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,10 +12,16 @@ struct meta_sector_t
|
|||
int usage_count;
|
||||
};
|
||||
|
||||
struct flusher_sync_t
|
||||
{
|
||||
int ready_count;
|
||||
int state;
|
||||
};
|
||||
|
||||
class journal_flusher_t;
|
||||
|
||||
// Journal flusher coroutine
|
||||
struct journal_flusher_co
|
||||
class journal_flusher_co
|
||||
{
|
||||
blockstore *bs;
|
||||
journal_flusher_t *flusher;
|
||||
|
@ -29,8 +35,11 @@ struct journal_flusher_co
|
|||
std::vector<copy_buffer_t>::iterator it;
|
||||
uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos;
|
||||
std::map<uint64_t, meta_sector_t>::iterator meta_it;
|
||||
std::function<void(ring_data_t*)> simple_callback;
|
||||
std::list<flusher_sync_t>::iterator cur_sync;
|
||||
friend class journal_flusher_t;
|
||||
public:
|
||||
journal_flusher_co();
|
||||
void loop();
|
||||
};
|
||||
|
||||
|
@ -38,10 +47,14 @@ public:
|
|||
class journal_flusher_t
|
||||
{
|
||||
int flusher_count;
|
||||
int active_flushers;
|
||||
int sync_threshold;
|
||||
bool sync_required;
|
||||
journal_flusher_co *co;
|
||||
blockstore *bs;
|
||||
friend class journal_flusher_co;
|
||||
|
||||
int active_flushers, active_until_sync;
|
||||
std::list<flusher_sync_t> syncs;
|
||||
public:
|
||||
std::map<uint64_t, meta_sector_t> meta_sectors;
|
||||
std::deque<obj_ver_id> flush_queue;
|
||||
|
|
|
@ -2,9 +2,6 @@
|
|||
|
||||
// Stabilize small write:
|
||||
// 1) Copy data from the journal to the data device
|
||||
// Sync it before writing metadata if we want to keep metadata consistent
|
||||
// Overall it's optional because it can be replayed from the journal until
|
||||
// it's cleared, and reads are also fulfilled from the journal
|
||||
// 2) Increase version on the metadata device and sync it
|
||||
// 3) Advance clean_db entry's version, clear previous journal entries
|
||||
//
|
||||
|
@ -112,8 +109,6 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
|
|||
{
|
||||
if (data->res < 0)
|
||||
{
|
||||
// sync error
|
||||
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
|
||||
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
|
||||
}
|
||||
op->pending_ops--;
|
||||
|
|
|
@ -111,8 +111,6 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op)
|
|||
{
|
||||
if (data->res < 0)
|
||||
{
|
||||
// sync error
|
||||
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
|
||||
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
|
||||
}
|
||||
op->pending_ops--;
|
||||
|
|
|
@ -149,7 +149,6 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op)
|
|||
{
|
||||
if (data->res < 0)
|
||||
{
|
||||
// write error
|
||||
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
|
||||
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue