forked from vitalif/vitastor
Switch object states after sync
parent
64185f7a1f
commit
7aabe11ef9
|
@ -213,9 +213,9 @@ private:
|
|||
uint64_t min_used_journal_sector, max_used_journal_sector;
|
||||
|
||||
// Sync
|
||||
std::deque<obj_ver_id> sync_big_writes;
|
||||
std::deque<obj_ver_id> sync_big_writes, sync_small_writes;
|
||||
std::list<blockstore_operation*>::iterator in_progress_ptr;
|
||||
int big_write_count, sync_state, prev_sync_count;
|
||||
int sync_state, prev_sync_count;
|
||||
};
|
||||
|
||||
class blockstore;
|
||||
|
@ -230,8 +230,7 @@ class blockstore
|
|||
spp::sparse_hash_map<object_id, clean_entry, oid_hash> object_db;
|
||||
std::map<obj_ver_id, dirty_entry> dirty_db;
|
||||
std::list<blockstore_operation*> submit_queue;
|
||||
std::deque<obj_ver_id> unsynced_big_writes;
|
||||
int unsynced_small_writes = 0;
|
||||
std::deque<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
|
||||
std::list<blockstore_operation*> in_progress_syncs;
|
||||
uint32_t block_order, block_size;
|
||||
uint64_t block_count;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#include "blockstore.h"
|
||||
|
||||
#define SYNC_NO_BIG 1
|
||||
#define SYNC_HAS_SMALL 1
|
||||
#define SYNC_HAS_BIG 2
|
||||
#define SYNC_DATA_SYNC_SENT 3
|
||||
#define SYNC_DATA_SYNC_DONE 4
|
||||
|
@ -11,17 +11,16 @@ int blockstore::dequeue_sync(blockstore_operation *op)
|
|||
{
|
||||
if (op->sync_state == 0)
|
||||
{
|
||||
op->big_write_count = 0;
|
||||
op->sync_big_writes.swap(unsynced_big_writes);
|
||||
op->big_write_count = op->sync_big_writes.size();
|
||||
if (op->big_write_count > 0)
|
||||
op->sync_small_writes.swap(unsynced_small_writes);
|
||||
if (op->sync_big_writes.size() > 0)
|
||||
op->sync_state = SYNC_HAS_BIG;
|
||||
else if (unsynced_small_writes == 0)
|
||||
op->sync_state = SYNC_DONE;
|
||||
else if (op->sync_small_writes.size() > 0)
|
||||
op->sync_state = SYNC_HAS_SMALL;
|
||||
else
|
||||
op->sync_state = SYNC_NO_BIG;
|
||||
op->sync_state = SYNC_DONE;
|
||||
unsynced_big_writes.clear();
|
||||
unsynced_small_writes = 0;
|
||||
unsynced_small_writes.clear();
|
||||
}
|
||||
int r = continue_sync(op);
|
||||
if (r)
|
||||
|
@ -38,7 +37,7 @@ int blockstore::dequeue_sync(blockstore_operation *op)
|
|||
|
||||
int blockstore::continue_sync(blockstore_operation *op)
|
||||
{
|
||||
if (op->sync_state == SYNC_NO_BIG)
|
||||
if (op->sync_state == SYNC_HAS_SMALL)
|
||||
{
|
||||
// No big writes, just fsync the journal
|
||||
BS_SUBMIT_GET_SQE(sqe, data);
|
||||
|
@ -60,7 +59,7 @@ int blockstore::continue_sync(blockstore_operation *op)
|
|||
{
|
||||
// 2nd step: Data device is synced, prepare & write journal entries
|
||||
// Check space in the journal and journal memory buffers
|
||||
int required = op->big_write_count, sectors_required = 1;
|
||||
int required = op->sync_big_writes.size(), sectors_required = 1;
|
||||
uint64_t next_pos = journal.next_free, next_sector = journal.cur_sector;
|
||||
while (1)
|
||||
{
|
||||
|
@ -94,7 +93,7 @@ int blockstore::continue_sync(blockstore_operation *op)
|
|||
// Prepare and submit journal entries
|
||||
op->min_used_journal_sector = 1 + journal.cur_sector;
|
||||
sectors_required = 0;
|
||||
required = op->big_write_count;
|
||||
required = op->sync_big_writes.size();
|
||||
auto it = op->sync_big_writes.begin();
|
||||
while (1)
|
||||
{
|
||||
|
@ -171,14 +170,26 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op)
|
|||
}
|
||||
op->min_used_journal_sector = op->max_used_journal_sector = 0;
|
||||
}
|
||||
// Handle state
|
||||
// Handle states
|
||||
if (op->sync_state == SYNC_DATA_SYNC_SENT)
|
||||
{
|
||||
op->sync_state = SYNC_DATA_SYNC_DONE;
|
||||
for (auto it = op->sync_big_writes.begin(); it != op->sync_big_writes.end(); it++)
|
||||
{
|
||||
dirty_db[*it].state = ST_D_SYNCED;
|
||||
}
|
||||
}
|
||||
else if (op->sync_state == SYNC_JOURNAL_SYNC_SENT)
|
||||
{
|
||||
op->sync_state = SYNC_DONE;
|
||||
for (auto it = op->sync_big_writes.begin(); it != op->sync_big_writes.end(); it++)
|
||||
{
|
||||
dirty_db[*it].state = ST_D_META_SYNCED;
|
||||
}
|
||||
for (auto it = op->sync_small_writes.begin(); it != op->sync_small_writes.end(); it++)
|
||||
{
|
||||
dirty_db[*it].state = ST_J_SYNCED;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -172,17 +172,28 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op)
|
|||
op->min_used_journal_sector = op->max_used_journal_sector = 0;
|
||||
}
|
||||
// Switch object state
|
||||
auto dirty_it = dirty_db.find((obj_ver_id){
|
||||
auto & dirty_entry = dirty_db[(obj_ver_id){
|
||||
.oid = op->oid,
|
||||
.version = op->version,
|
||||
});
|
||||
dirty_it->second.state = (dirty_it->second.state == ST_J_SUBMITTED
|
||||
? ST_J_WRITTEN : (dirty_it->second.state == ST_DEL_SUBMITTED ? ST_DEL_WRITTEN : ST_D_WRITTEN));
|
||||
}];
|
||||
if (dirty_entry.state == ST_J_SUBMITTED)
|
||||
{
|
||||
dirty_entry.state = ST_J_WRITTEN;
|
||||
}
|
||||
else if (dirty_entry.state == ST_D_SUBMITTED)
|
||||
{
|
||||
dirty_entry.state = ST_D_WRITTEN;
|
||||
}
|
||||
else if (dirty_entry.state == ST_DEL_SUBMITTED)
|
||||
{
|
||||
dirty_entry.state = ST_DEL_WRITTEN;
|
||||
}
|
||||
// Acknowledge write without sync
|
||||
op->retval = op->len;
|
||||
op->callback(op);
|
||||
// Remember write as unsynced
|
||||
if (IS_BIG_WRITE(dirty_it->second.state))
|
||||
// FIXME: Could state change to ST_STABLE? It could break this check
|
||||
if (IS_BIG_WRITE(dirty_entry.state))
|
||||
{
|
||||
unsynced_big_writes.push_back((obj_ver_id){
|
||||
.oid = op->oid,
|
||||
|
@ -191,7 +202,10 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op)
|
|||
}
|
||||
else
|
||||
{
|
||||
unsynced_small_writes++;
|
||||
unsynced_small_writes.push_back((obj_ver_id){
|
||||
.oid = op->oid,
|
||||
.version = op->version,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue