forked from vitalif/vitastor
Fix another stall due to bad unstable_writes tracking, do not try to write beyond the end of the journal
parent
2630e2e3b9
commit
35a6ed728d
|
@ -114,7 +114,7 @@ void blockstore::loop()
|
||||||
auto op_ptr = cur;
|
auto op_ptr = cur;
|
||||||
auto op = *(cur++);
|
auto op = *(cur++);
|
||||||
// FIXME: This needs some simplification
|
// FIXME: This needs some simplification
|
||||||
// Writes should not block reads if the ring is not null and if reads don't depend on them
|
// Writes should not block reads if the ring is not full and if reads don't depend on them
|
||||||
// In all other cases we should stop submission
|
// In all other cases we should stop submission
|
||||||
if (op->wait_for)
|
if (op->wait_for)
|
||||||
{
|
{
|
||||||
|
|
|
@ -58,7 +58,7 @@ void journal_flusher_t::loop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void journal_flusher_t::queue_flush(obj_ver_id ov)
|
void journal_flusher_t::enqueue_flush(obj_ver_id ov)
|
||||||
{
|
{
|
||||||
auto it = flush_versions.find(ov.oid);
|
auto it = flush_versions.find(ov.oid);
|
||||||
if (it != flush_versions.end())
|
if (it != flush_versions.end())
|
||||||
|
|
|
@ -70,6 +70,6 @@ public:
|
||||||
~journal_flusher_t();
|
~journal_flusher_t();
|
||||||
void loop();
|
void loop();
|
||||||
bool is_active();
|
bool is_active();
|
||||||
void queue_flush(obj_ver_id oid);
|
void enqueue_flush(obj_ver_id oid);
|
||||||
void unshift_flush(obj_ver_id oid);
|
void unshift_flush(obj_ver_id oid);
|
||||||
};
|
};
|
||||||
|
|
|
@ -86,7 +86,7 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int
|
||||||
{
|
{
|
||||||
auto clean_it = bs->clean_db.find(entries[i].oid);
|
auto clean_it = bs->clean_db.find(entries[i].oid);
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf("Clean entry %lu: %lu:%lu v%lu\n", done_cnt+i, entries[i].oid.inode, entries[i].oid.stripe, entries[i].version);
|
printf("Clean entry %u: %lu:%lu v%lu\n", done_cnt+i, entries[i].oid.inode, entries[i].oid.stripe, entries[i].version);
|
||||||
#endif
|
#endif
|
||||||
if (clean_it == bs->clean_db.end() || clean_it->second.version < entries[i].version)
|
if (clean_it == bs->clean_db.end() || clean_it->second.version < entries[i].version)
|
||||||
{
|
{
|
||||||
|
@ -359,8 +359,11 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
||||||
.journal_sector = proc_pos,
|
.journal_sector = proc_pos,
|
||||||
});
|
});
|
||||||
bs->journal.used_sectors[proc_pos]++;
|
bs->journal.used_sectors[proc_pos]++;
|
||||||
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
printf("journal offset %lu is used by %lu:%lu v%lu\n", proc_pos, ov.oid.inode, ov.oid.stripe, ov.version);
|
||||||
|
#endif
|
||||||
auto & unstab = bs->unstable_writes[ov.oid];
|
auto & unstab = bs->unstable_writes[ov.oid];
|
||||||
unstab = !unstab || unstab > ov.version ? ov.version : unstab;
|
unstab = unstab < ov.version ? ov.version : unstab;
|
||||||
}
|
}
|
||||||
else if (je->type == JE_BIG_WRITE)
|
else if (je->type == JE_BIG_WRITE)
|
||||||
{
|
{
|
||||||
|
@ -380,7 +383,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
||||||
});
|
});
|
||||||
bs->journal.used_sectors[proc_pos]++;
|
bs->journal.used_sectors[proc_pos]++;
|
||||||
auto & unstab = bs->unstable_writes[ov.oid];
|
auto & unstab = bs->unstable_writes[ov.oid];
|
||||||
unstab = !unstab || unstab > ov.version ? ov.version : unstab;
|
unstab = unstab < ov.version ? ov.version : unstab;
|
||||||
}
|
}
|
||||||
else if (je->type == JE_STABLE)
|
else if (je->type == JE_STABLE)
|
||||||
{
|
{
|
||||||
|
@ -410,7 +413,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
||||||
if (it->first.oid != ov.oid || IS_STABLE(it->second.state))
|
if (it->first.oid != ov.oid || IS_STABLE(it->second.state))
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
bs->flusher->queue_flush(ov);
|
bs->flusher->enqueue_flush(ov);
|
||||||
}
|
}
|
||||||
auto unstab_it = bs->unstable_writes.find(ov.oid);
|
auto unstab_it = bs->unstable_writes.find(ov.oid);
|
||||||
if (unstab_it != bs->unstable_writes.end() && unstab_it->second <= ov.version)
|
if (unstab_it != bs->unstable_writes.end() && unstab_it->second <= ov.version)
|
||||||
|
|
|
@ -171,7 +171,7 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
printf("enqueue_flush %lu:%lu v%lu\n", v->oid.inode, v->oid.stripe, v->version);
|
printf("enqueue_flush %lu:%lu v%lu\n", v->oid.inode, v->oid.stripe, v->version);
|
||||||
#endif
|
#endif
|
||||||
flusher->queue_flush(*v);
|
flusher->enqueue_flush(*v);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Acknowledge op
|
// Acknowledge op
|
||||||
|
|
|
@ -146,6 +146,8 @@ int blockstore::dequeue_write(blockstore_operation *op)
|
||||||
dirty_it->second.location = journal.next_free;
|
dirty_it->second.location = journal.next_free;
|
||||||
dirty_it->second.state = ST_J_SUBMITTED;
|
dirty_it->second.state = ST_J_SUBMITTED;
|
||||||
journal.next_free += op->len;
|
journal.next_free += op->len;
|
||||||
|
if (journal.next_free >= journal.len)
|
||||||
|
journal.next_free = 512;
|
||||||
op->pending_ops = 2;
|
op->pending_ops = 2;
|
||||||
// Remember small write as unsynced
|
// Remember small write as unsynced
|
||||||
unsynced_small_writes.push_back((obj_ver_id){
|
unsynced_small_writes.push_back((obj_ver_id){
|
||||||
|
|
Loading…
Reference in New Issue