forked from vitalif/vitastor
Track unstable writes
parent
82a2b8e7d9
commit
50cf3667fa
|
@ -50,6 +50,7 @@
|
|||
|
||||
#define IS_IN_FLIGHT(st) (st == ST_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED)
|
||||
#define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT)
|
||||
#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED)
|
||||
#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE)
|
||||
#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE)
|
||||
#define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE)
|
||||
|
@ -252,6 +253,7 @@ class blockstore
|
|||
std::map<obj_ver_id, dirty_entry> dirty_db;
|
||||
std::list<blockstore_operation*> submit_queue; // FIXME: funny thing is that vector is better here
|
||||
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
|
||||
std::map<object_id, uint64_t> unstable_writes;
|
||||
std::list<blockstore_operation*> in_progress_syncs; // ...and probably here, too
|
||||
uint32_t block_order, block_size;
|
||||
uint64_t block_count;
|
||||
|
|
|
@ -355,6 +355,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
.journal_sector = proc_pos,
|
||||
});
|
||||
bs->journal.used_sectors[proc_pos]++;
|
||||
auto & unstab = bs->unstable_writes[ov.oid];
|
||||
unstab = !unstab || unstab > ov.version ? ov.version : unstab;
|
||||
}
|
||||
else if (je->type == JE_BIG_WRITE)
|
||||
{
|
||||
|
@ -373,6 +375,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
.journal_sector = proc_pos,
|
||||
});
|
||||
bs->journal.used_sectors[proc_pos]++;
|
||||
auto & unstab = bs->unstable_writes[ov.oid];
|
||||
unstab = !unstab || unstab > ov.version ? ov.version : unstab;
|
||||
}
|
||||
else if (je->type == JE_STABLE)
|
||||
{
|
||||
|
@ -404,6 +408,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
}
|
||||
bs->flusher->queue_flush(ov);
|
||||
}
|
||||
auto unstab_it = bs->unstable_writes.find(ov.oid);
|
||||
if (unstab_it != bs->unstable_writes.end() && unstab_it->second <= ov.version)
|
||||
{
|
||||
bs->unstable_writes.erase(unstab_it);
|
||||
}
|
||||
}
|
||||
else if (je->type == JE_DELETE)
|
||||
{
|
||||
|
|
|
@ -96,7 +96,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
|
|||
{
|
||||
dirty_entry& dirty = dirty_it->second;
|
||||
bool version_ok = read_op->version >= dirty_it->first.version;
|
||||
if (IS_STABLE(dirty.state))
|
||||
if (IS_SYNCED(dirty.state))
|
||||
{
|
||||
if (!version_ok && read_op->version != 0)
|
||||
read_op->version = dirty_it->first.version;
|
||||
|
|
|
@ -22,11 +22,10 @@
|
|||
// AND We must do it in batches, for the sake of reduced fsync call count
|
||||
// AND We must know what we stabilize. Basic workflow is like:
|
||||
// 1) primary OSD receives sync request
|
||||
// 2) it determines his own unsynced writes from blockstore's information
|
||||
// just before submitting fsync
|
||||
// 3) it submits syncs to blockstore and peers
|
||||
// 4) after everyone acks sync it takes the object list and sends stabilize requests to everyone
|
||||
// 5) after everyone acks stabilize requests it acks the client's sync operation
|
||||
// 2) it submits syncs to blockstore and peers
|
||||
// 3) after everyone acks sync it acks sync to the client
|
||||
// 4) after a while it takes his synced object list and sends stabilize requests
|
||||
// to peers and to its own blockstore, thus freeing the old version
|
||||
|
||||
int blockstore::dequeue_stable(blockstore_operation *op)
|
||||
{
|
||||
|
|
|
@ -148,10 +148,14 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op)
|
|||
op->sync_state = SYNC_DONE;
|
||||
for (auto it = op->sync_big_writes.begin(); it != op->sync_big_writes.end(); it++)
|
||||
{
|
||||
auto & unstab = unstable_writes[it->oid];
|
||||
unstab = !unstab || unstab > it->version ? it->version : unstab;
|
||||
dirty_db[*it].state = ST_D_META_SYNCED;
|
||||
}
|
||||
for (auto it = op->sync_small_writes.begin(); it != op->sync_small_writes.end(); it++)
|
||||
{
|
||||
auto & unstab = unstable_writes[it->oid];
|
||||
unstab = !unstab || unstab > it->version ? it->version : unstab;
|
||||
dirty_db[*it].state = ST_J_SYNCED;
|
||||
}
|
||||
ack_sync(op);
|
||||
|
|
Loading…
Reference in New Issue