forked from vitalif/vitastor
Fix sparse reads using bitmap, fix journal replay (we could sometimes lose its end)
parent
5739b02de8
commit
36d8c8724f
|
@ -692,12 +692,12 @@ void journal_flusher_co::bitmap_set(void *bitmap, uint64_t start, uint64_t len)
|
||||||
{
|
{
|
||||||
if (len == 32*BITMAP_GRANULARITY)
|
if (len == 32*BITMAP_GRANULARITY)
|
||||||
{
|
{
|
||||||
*((uint32_t*)bitmap) = 1;
|
*((uint32_t*)bitmap) = UINT32_MAX;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
else if (len == 64*BITMAP_GRANULARITY)
|
else if (len == 64*BITMAP_GRANULARITY)
|
||||||
{
|
{
|
||||||
*((uint64_t*)bitmap) = 1;
|
*((uint64_t*)bitmap) = UINT64_MAX;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -707,7 +707,7 @@ void journal_flusher_co::bitmap_set(void *bitmap, uint64_t start, uint64_t len)
|
||||||
{
|
{
|
||||||
if (!(bit_start & 7) && bit_end >= bit_start+8)
|
if (!(bit_start & 7) && bit_end >= bit_start+8)
|
||||||
{
|
{
|
||||||
((uint8_t*)bitmap)[bit_start / 8] = 1;
|
((uint8_t*)bitmap)[bit_start / 8] = UINT8_MAX;
|
||||||
bit_start += 8;
|
bit_start += 8;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -417,7 +417,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
||||||
if (pos == 0)
|
if (pos == 0)
|
||||||
{
|
{
|
||||||
// invalid entry in the beginning, this is definitely the end of the journal
|
// invalid entry in the beginning, this is definitely the end of the journal
|
||||||
bs->journal.next_free = next_free;
|
bs->journal.next_free = proc_pos;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -531,8 +531,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
||||||
.state = ST_D_META_SYNCED,
|
.state = ST_D_META_SYNCED,
|
||||||
.flags = 0,
|
.flags = 0,
|
||||||
.location = je->big_write.location,
|
.location = je->big_write.location,
|
||||||
.offset = 0,
|
.offset = je->big_write.offset,
|
||||||
.len = bs->block_size,
|
.len = je->big_write.len,
|
||||||
.journal_sector = proc_pos,
|
.journal_sector = proc_pos,
|
||||||
});
|
});
|
||||||
#ifdef BLOCKSTORE_DEBUG
|
#ifdef BLOCKSTORE_DEBUG
|
||||||
|
|
|
@ -55,6 +55,8 @@ struct __attribute__((__packed__)) journal_entry_big_write
|
||||||
uint32_t crc32_prev;
|
uint32_t crc32_prev;
|
||||||
object_id oid;
|
object_id oid;
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
|
uint32_t offset;
|
||||||
|
uint32_t len;
|
||||||
uint64_t location;
|
uint64_t location;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,10 @@ int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfille
|
||||||
.len = it == PRIV(read_op)->read_vec.end() || it->offset >= item_end ? item_end-cur_start : it->offset-cur_start,
|
.len = it == PRIV(read_op)->read_vec.end() || it->offset >= item_end ? item_end-cur_start : it->offset-cur_start,
|
||||||
};
|
};
|
||||||
it = PRIV(read_op)->read_vec.insert(it, el);
|
it = PRIV(read_op)->read_vec.insert(it, el);
|
||||||
if (!fulfill_read_push(read_op, read_op->buf + el.offset - read_op->offset, item_location + el.offset - item_start, el.len, item_state, item_version))
|
if (!fulfill_read_push(read_op,
|
||||||
|
read_op->buf + el.offset - read_op->offset,
|
||||||
|
item_location + el.offset - item_start,
|
||||||
|
el.len, item_state, item_version))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -97,7 +100,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||||
PRIV(read_op)->pending_ops = 0;
|
PRIV(read_op)->pending_ops = 0;
|
||||||
if (dirty_found)
|
if (dirty_found)
|
||||||
{
|
{
|
||||||
while (dirty_it->first.oid == read_op->oid && fulfilled < read_op->len)
|
while (dirty_it->first.oid == read_op->oid)
|
||||||
{
|
{
|
||||||
dirty_entry& dirty = dirty_it->second;
|
dirty_entry& dirty = dirty_it->second;
|
||||||
bool version_ok = read_op->version >= dirty_it->first.version;
|
bool version_ok = read_op->version >= dirty_it->first.version;
|
||||||
|
@ -110,14 +113,14 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||||
if (version_ok)
|
if (version_ok)
|
||||||
{
|
{
|
||||||
if (!fulfill_read(read_op, fulfilled, dirty.offset, dirty.offset + dirty.len,
|
if (!fulfill_read(read_op, fulfilled, dirty.offset, dirty.offset + dirty.len,
|
||||||
dirty.state, dirty_it->first.version, dirty.location))
|
dirty.state, dirty_it->first.version, dirty.location + (IS_JOURNAL(dirty.state) ? 0 : dirty.offset)))
|
||||||
{
|
{
|
||||||
// need to wait. undo added requests, don't dequeue op
|
// need to wait. undo added requests, don't dequeue op
|
||||||
PRIV(read_op)->read_vec.clear();
|
PRIV(read_op)->read_vec.clear();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (fulfilled == read_op->len)
|
if (fulfilled == read_op->len || dirty_it == dirty_db.begin())
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -152,11 +155,17 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||||
uint64_t bmp_start = 0, bmp_end = 0, bmp_size = block_size/BITMAP_GRANULARITY;
|
uint64_t bmp_start = 0, bmp_end = 0, bmp_size = block_size/BITMAP_GRANULARITY;
|
||||||
while (bmp_start < bmp_size)
|
while (bmp_start < bmp_size)
|
||||||
{
|
{
|
||||||
while (!(clean_entry_bitmap[bmp_start >> 3] & (1 << (bmp_start & 0x7))) && bmp_start < bmp_size)
|
while (!(clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7))) && bmp_end < bmp_size)
|
||||||
{
|
{
|
||||||
bmp_start++;
|
bmp_end++;
|
||||||
}
|
}
|
||||||
bmp_end = bmp_start;
|
if (bmp_end > bmp_start)
|
||||||
|
{
|
||||||
|
// fill with zeroes
|
||||||
|
fulfill_read(read_op, fulfilled, bmp_start * BITMAP_GRANULARITY,
|
||||||
|
bmp_end * BITMAP_GRANULARITY, ST_DEL_STABLE, 0, 0);
|
||||||
|
}
|
||||||
|
bmp_start = bmp_end;
|
||||||
while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size)
|
while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size)
|
||||||
{
|
{
|
||||||
bmp_end++;
|
bmp_end++;
|
||||||
|
@ -164,7 +173,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||||
if (bmp_end > bmp_start)
|
if (bmp_end > bmp_start)
|
||||||
{
|
{
|
||||||
if (!fulfill_read(read_op, fulfilled, bmp_start * BITMAP_GRANULARITY,
|
if (!fulfill_read(read_op, fulfilled, bmp_start * BITMAP_GRANULARITY,
|
||||||
(bmp_end - bmp_start) * BITMAP_GRANULARITY, ST_CURRENT, 0, clean_it->second.location + bmp_start * BITMAP_GRANULARITY))
|
bmp_end * BITMAP_GRANULARITY, ST_CURRENT, 0, clean_it->second.location + bmp_start * BITMAP_GRANULARITY))
|
||||||
{
|
{
|
||||||
// need to wait. undo added requests, don't dequeue op
|
// need to wait. undo added requests, don't dequeue op
|
||||||
PRIV(read_op)->read_vec.clear();
|
PRIV(read_op)->read_vec.clear();
|
||||||
|
@ -175,6 +184,12 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (fulfilled < read_op->len)
|
||||||
|
{
|
||||||
|
// fill remaining parts with zeroes
|
||||||
|
fulfill_read(read_op, fulfilled, 0, block_size, ST_DEL_STABLE, 0, 0);
|
||||||
|
}
|
||||||
|
assert(fulfilled == read_op->len);
|
||||||
if (!PRIV(read_op)->pending_ops)
|
if (!PRIV(read_op)->pending_ops)
|
||||||
{
|
{
|
||||||
// everything is fulfilled from memory
|
// everything is fulfilled from memory
|
||||||
|
@ -183,10 +198,6 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||||
// region is not allocated - return zeroes
|
// region is not allocated - return zeroes
|
||||||
memset(read_op->buf, 0, read_op->len);
|
memset(read_op->buf, 0, read_op->len);
|
||||||
}
|
}
|
||||||
if (fulfilled != read_op->len)
|
|
||||||
{
|
|
||||||
printf("BUG: fulfilled %lu < %d read bytes\n", fulfilled, read_op->len);
|
|
||||||
}
|
|
||||||
read_op->retval = read_op->len;
|
read_op->retval = read_op->len;
|
||||||
read_op->callback(read_op);
|
read_op->callback(read_op);
|
||||||
return 1;
|
return 1;
|
||||||
|
|
|
@ -101,6 +101,8 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||||
#endif
|
#endif
|
||||||
je->oid = it->oid;
|
je->oid = it->oid;
|
||||||
je->version = it->version;
|
je->version = it->version;
|
||||||
|
je->offset = dirty_db[*it].offset;
|
||||||
|
je->len = dirty_db[*it].len;
|
||||||
je->location = dirty_db[*it].location;
|
je->location = dirty_db[*it].location;
|
||||||
je->crc32 = je_crc32((journal_entry*)je);
|
je->crc32 = je_crc32((journal_entry*)je);
|
||||||
journal.crc32_last = je->crc32;
|
journal.crc32_last = je->crc32;
|
||||||
|
|
Loading…
Reference in New Issue