forked from vitalif/vitastor
Prevent two parallel flushes of the same object
parent
71194f014a
commit
c38c8ab029
|
@ -33,7 +33,6 @@
|
|||
#define ST_J_WRITTEN 3
|
||||
#define ST_J_SYNCED 4
|
||||
#define ST_J_STABLE 5
|
||||
#define ST_J_MOVE_READ_SUBMITTED 6
|
||||
#define ST_J_MOVE_WRITE_SUBMITTED 7
|
||||
#define ST_J_MOVE_SYNCED 8
|
||||
|
||||
|
@ -109,6 +108,11 @@ inline bool operator == (const object_id & a, const object_id & b)
|
|||
return a.inode == b.inode && a.stripe == b.stripe;
|
||||
}
|
||||
|
||||
inline bool operator != (const object_id & a, const object_id & b)
|
||||
{
|
||||
return a.inode != b.inode || a.stripe != b.stripe;
|
||||
}
|
||||
|
||||
inline bool operator < (const object_id & a, const object_id & b)
|
||||
{
|
||||
return a.inode < b.inode || a.inode == b.inode && a.stripe < b.stripe;
|
||||
|
|
|
@ -46,6 +46,34 @@ void journal_flusher_t::loop()
|
|||
}
|
||||
}
|
||||
|
||||
void journal_flusher_t::queue_flush(obj_ver_id ov)
|
||||
{
|
||||
auto it = flush_versions.find(ov.oid);
|
||||
if (it != flush_versions.end())
|
||||
{
|
||||
it->second = ov.version;
|
||||
}
|
||||
else
|
||||
{
|
||||
flush_versions[ov.oid] = ov.version;
|
||||
flush_queue.push_back(ov.oid);
|
||||
}
|
||||
}
|
||||
|
||||
void journal_flusher_t::unshift_flush(obj_ver_id ov)
|
||||
{
|
||||
auto it = flush_versions.find(ov.oid);
|
||||
if (it != flush_versions.end())
|
||||
{
|
||||
it->second = ov.version;
|
||||
}
|
||||
else
|
||||
{
|
||||
flush_versions[ov.oid] = ov.version;
|
||||
flush_queue.push_front(ov.oid);
|
||||
}
|
||||
}
|
||||
|
||||
#define await_sqe(label) \
|
||||
resume_##label:\
|
||||
sqe = bs->get_sqe();\
|
||||
|
@ -85,11 +113,26 @@ void journal_flusher_co::loop()
|
|||
resume_0:
|
||||
if (!flusher->flush_queue.size())
|
||||
return;
|
||||
cur = flusher->flush_queue.front();
|
||||
cur.oid = flusher->flush_queue.front();
|
||||
cur.version = flusher->flush_versions[cur.oid];
|
||||
flusher->flush_queue.pop_front();
|
||||
flusher->flush_versions.erase(cur.oid);
|
||||
dirty_end = bs->dirty_db.find(cur);
|
||||
if (dirty_end != bs->dirty_db.end())
|
||||
{
|
||||
repeat_it = flusher->sync_to_repeat.find(cur.oid);
|
||||
if (repeat_it != flusher->sync_to_repeat.end())
|
||||
{
|
||||
// We don't flush different parts of history of the same object in parallel
|
||||
// So we check if someone is already flushing this object
|
||||
// In that case we set sync_to_repeat to 2 and pick another object
|
||||
// Another coroutine will see this "2" and re-queue the object after it finishes
|
||||
repeat_it->second = cur.version;
|
||||
wait_state = 0;
|
||||
goto resume_0;
|
||||
}
|
||||
else
|
||||
repeat_it->second = 0;
|
||||
dirty_it = dirty_end;
|
||||
flusher->active_flushers++;
|
||||
flusher->active_until_sync++;
|
||||
|
@ -99,7 +142,7 @@ resume_0:
|
|||
skip_copy = false;
|
||||
do
|
||||
{
|
||||
if (dirty_it->second.state == ST_J_STABLE)
|
||||
if (dirty_it->second.state == ST_J_STABLE && !skip_copy)
|
||||
{
|
||||
// First we submit all reads
|
||||
offset = dirty_it->second.offset;
|
||||
|
@ -127,8 +170,6 @@ resume_0:
|
|||
break;
|
||||
}
|
||||
}
|
||||
// So subsequent stabilizers don't flush the entry again
|
||||
dirty_it->second.state = ST_J_MOVE_READ_SUBMITTED;
|
||||
}
|
||||
else if (dirty_it->second.state == ST_D_STABLE)
|
||||
{
|
||||
|
@ -139,16 +180,10 @@ resume_0:
|
|||
}
|
||||
skip_copy = true;
|
||||
}
|
||||
else if (IS_STABLE(dirty_it->second.state))
|
||||
{
|
||||
// Other coroutine is already flushing it, stop
|
||||
break;
|
||||
}
|
||||
else
|
||||
else if (!IS_STABLE(dirty_it->second.state))
|
||||
{
|
||||
throw new std::runtime_error("BUG: Unexpected dirty_entry state during flush: " + std::to_string(dirty_it->second.state));
|
||||
}
|
||||
dirty_start = dirty_it;
|
||||
dirty_it--;
|
||||
} while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid);
|
||||
if (wait_count == 0 && clean_loc == UINT64_MAX)
|
||||
|
@ -156,6 +191,13 @@ resume_0:
|
|||
// Nothing to flush
|
||||
flusher->active_flushers--;
|
||||
flusher->active_until_sync--;
|
||||
repeat_it = flusher->sync_to_repeat.find(cur.oid);
|
||||
if (repeat_it->second != 0)
|
||||
{
|
||||
// Requeue version
|
||||
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second });
|
||||
}
|
||||
flusher->sync_to_repeat.erase(repeat_it);
|
||||
wait_state = 0;
|
||||
goto resume_0;
|
||||
}
|
||||
|
@ -313,24 +355,34 @@ resume_0:
|
|||
.version = cur.version,
|
||||
.location = clean_loc,
|
||||
};
|
||||
for (dirty_it = dirty_start; dirty_it != dirty_end; dirty_it++)
|
||||
dirty_it = dirty_end;
|
||||
do
|
||||
{
|
||||
if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc)
|
||||
{
|
||||
allocator_set(bs->data_alloc, dirty_it->second.location >> bs->block_order, false);
|
||||
}
|
||||
int used = --bs->journal.used_sectors[dirty_it->second.journal_sector];
|
||||
if (used == 1)
|
||||
if (used == 0)
|
||||
{
|
||||
bs->journal.used_sectors.erase(dirty_it->second.journal_sector);
|
||||
}
|
||||
}
|
||||
// Then, basically, remove the whole version range from dirty_db...
|
||||
// FIXME not until dirty_start, until other object. And wait for previous flushes.
|
||||
bs->dirty_db.erase(dirty_start, std::next(dirty_end));
|
||||
dirty_it--;
|
||||
} while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid);
|
||||
// Then, basically, remove everything up to the current version from dirty_db...
|
||||
if (dirty_it->first.oid != cur.oid)
|
||||
dirty_it++;
|
||||
bs->dirty_db.erase(dirty_it, std::next(dirty_end));
|
||||
// FIXME: ...and clear unused part of the journal (with some interval, not for every flushed op)
|
||||
wait_state = 0;
|
||||
flusher->active_flushers--;
|
||||
repeat_it = flusher->sync_to_repeat.find(cur.oid);
|
||||
if (repeat_it->second != 0)
|
||||
{
|
||||
// Requeue version
|
||||
flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second });
|
||||
}
|
||||
flusher->sync_to_repeat.erase(repeat_it);
|
||||
goto resume_0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ class journal_flusher_co
|
|||
std::vector<copy_buffer_t>::iterator it;
|
||||
uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos;
|
||||
std::map<uint64_t, meta_sector_t>::iterator meta_it;
|
||||
std::map<object_id, uint64_t>::iterator repeat_it;
|
||||
std::function<void(ring_data_t*)> simple_callback;
|
||||
std::list<flusher_sync_t>::iterator cur_sync;
|
||||
friend class journal_flusher_t;
|
||||
|
@ -56,10 +57,15 @@ class journal_flusher_t
|
|||
|
||||
int active_flushers, active_until_sync;
|
||||
std::list<flusher_sync_t> syncs;
|
||||
public:
|
||||
std::map<object_id, uint64_t> sync_to_repeat;
|
||||
|
||||
std::map<uint64_t, meta_sector_t> meta_sectors;
|
||||
std::deque<obj_ver_id> flush_queue;
|
||||
std::deque<object_id> flush_queue;
|
||||
std::map<object_id, uint64_t> flush_versions;
|
||||
public:
|
||||
journal_flusher_t(int flusher_count, blockstore *bs);
|
||||
~journal_flusher_t();
|
||||
void loop();
|
||||
void queue_flush(obj_ver_id oid);
|
||||
void unshift_flush(obj_ver_id oid);
|
||||
};
|
||||
|
|
|
@ -139,7 +139,7 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
|
|||
}
|
||||
dirty_it--;
|
||||
} while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid);
|
||||
flusher->flush_queue.push_back(*v);
|
||||
flusher->queue_flush(*v);
|
||||
}
|
||||
}
|
||||
// Acknowledge op
|
||||
|
|
Loading…
Reference in New Issue