Fix journal loading
parent
eb55b2fe20
commit
299b7288d5
2
Makefile
2
Makefile
|
@ -5,7 +5,7 @@ clean:
|
|||
rm -f *.o
|
||||
crc32c.o: crc32c.c
|
||||
g++ -c -o $@ $<
|
||||
%.o: %.cpp blockstore.h
|
||||
%.o: %.cpp allocator.h blockstore_flush.h blockstore.h blockstore_init.h blockstore_journal.h crc32c.h ringloop.h xor.h
|
||||
g++ -g -Wall -Wno-sign-compare -Wno-parentheses -c -o $@ $<
|
||||
test: test.cpp
|
||||
g++ -g -O3 -o test -luring test.cpp
|
||||
|
|
|
@ -184,7 +184,7 @@ resume_0:
|
|||
}
|
||||
else if (dirty_it->second.state == ST_D_STABLE)
|
||||
{
|
||||
// There is an unflushed big write. Overwrite it with small writes
|
||||
// There is an unflushed big write. Copy small writes in its position
|
||||
if (!skip_copy)
|
||||
{
|
||||
clean_loc = dirty_it->second.location;
|
||||
|
|
|
@ -232,11 +232,7 @@ resume_1:
|
|||
wait_state = 2;
|
||||
return 1;
|
||||
}
|
||||
if (wrapped && journal_pos >= bs->journal.used_start)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else
|
||||
if (!wrapped || journal_pos < bs->journal.used_start)
|
||||
{
|
||||
GET_SQE();
|
||||
uint64_t end = bs->journal.len;
|
||||
|
@ -263,8 +259,13 @@ resume_1:
|
|||
return 1;
|
||||
}
|
||||
}
|
||||
if (!submitted)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
printf("Journal entries loaded: %d\n", entries_loaded);
|
||||
free(journal_buffer);
|
||||
bs->journal.crc32_last = crc32_last;
|
||||
journal_buffer = NULL;
|
||||
|
@ -273,27 +274,26 @@ resume_1:
|
|||
|
||||
int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
||||
{
|
||||
uint64_t total_pos = 0;
|
||||
uint64_t buf_pos = 0;
|
||||
if (cur_skip >= 0)
|
||||
{
|
||||
total_pos = cur_skip;
|
||||
buf_pos = cur_skip;
|
||||
cur_skip = 0;
|
||||
}
|
||||
while (total_pos < len)
|
||||
while (buf_pos < len)
|
||||
{
|
||||
total_pos += 512;
|
||||
uint64_t pos = 0;
|
||||
uint64_t proc_pos = buf_pos, pos = 0;
|
||||
buf_pos += 512;
|
||||
while (pos < 512)
|
||||
{
|
||||
journal_entry *je = (journal_entry*)((uint8_t*)buf + total_pos + pos);
|
||||
journal_entry *je = (journal_entry*)((uint8_t*)buf + proc_pos + pos);
|
||||
if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
|
||||
je->type < JE_SMALL_WRITE || je->type > JE_DELETE || started && je->crc32_prev != crc32_last)
|
||||
{
|
||||
if (pos == 0)
|
||||
{
|
||||
// invalid entry in the beginning, this is definitely the end of the journal
|
||||
// FIXME handle the edge case when the journal is full
|
||||
bs->journal.next_free = done_pos + total_pos;
|
||||
bs->journal.next_free = done_pos + proc_pos;
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
|
@ -309,18 +309,26 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
{
|
||||
// oid, version, offset, len
|
||||
uint64_t location;
|
||||
if (cur_skip > 0 || done_pos + total_pos + je->small_write.len > bs->journal.len)
|
||||
if (cur_skip > 0 || done_pos + buf_pos + je->small_write.len > bs->journal.len)
|
||||
{
|
||||
// data continues from the beginning of the journal
|
||||
if (buf_pos > len)
|
||||
{
|
||||
// if something is already skipped, skip everything until the end of the journal
|
||||
buf_pos = bs->journal.len-done_pos;
|
||||
}
|
||||
location = 512 + cur_skip;
|
||||
cur_skip += je->small_write.len;
|
||||
}
|
||||
else
|
||||
{
|
||||
// data is right next
|
||||
location = done_pos + total_pos;
|
||||
// FIXME: OOPS. Please don't modify total_pos here
|
||||
total_pos += je->small_write.len;
|
||||
location = done_pos + buf_pos;
|
||||
buf_pos += je->small_write.len;
|
||||
}
|
||||
if (location != je->small_write.data_offset)
|
||||
{
|
||||
throw new std::runtime_error("BUG: calculated journal data offset != stored journal data offset");
|
||||
}
|
||||
obj_ver_id ov = {
|
||||
.oid = je->small_write.oid,
|
||||
|
@ -332,10 +340,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
.location = location,
|
||||
.offset = je->small_write.offset,
|
||||
.len = je->small_write.len,
|
||||
.journal_sector = total_pos,
|
||||
.journal_sector = proc_pos,
|
||||
});
|
||||
bs->journal.used_sectors[total_pos]++;
|
||||
bs->flusher->queue_flush(ov);
|
||||
bs->journal.used_sectors[proc_pos]++;
|
||||
}
|
||||
else if (je->type == JE_BIG_WRITE)
|
||||
{
|
||||
|
@ -350,10 +357,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
.location = je->big_write.location,
|
||||
.offset = 0,
|
||||
.len = bs->block_size,
|
||||
.journal_sector = total_pos,
|
||||
.journal_sector = proc_pos,
|
||||
});
|
||||
bs->journal.used_sectors[total_pos]++;
|
||||
bs->flusher->queue_flush(ov);
|
||||
bs->journal.used_sectors[proc_pos]++;
|
||||
}
|
||||
else if (je->type == JE_STABLE)
|
||||
{
|
||||
|
@ -374,6 +380,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
it->second.state = (it->second.state == ST_D_META_SYNCED
|
||||
? ST_D_STABLE
|
||||
: (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE));
|
||||
bs->flusher->queue_flush(ov);
|
||||
}
|
||||
}
|
||||
else if (je->type == JE_DELETE)
|
||||
|
@ -389,16 +396,16 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
|||
.location = 0,
|
||||
.offset = 0,
|
||||
.len = 0,
|
||||
.journal_sector = total_pos,
|
||||
.journal_sector = proc_pos,
|
||||
});
|
||||
bs->journal.used_sectors[total_pos]++;
|
||||
bs->flusher->queue_flush(ov);
|
||||
bs->journal.used_sectors[proc_pos]++;
|
||||
}
|
||||
entries_loaded++;
|
||||
}
|
||||
}
|
||||
if (cur_skip == 0 && total_pos > len)
|
||||
if (buf_pos > len)
|
||||
{
|
||||
cur_skip = total_pos - len;
|
||||
cur_skip = buf_pos - len;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ class blockstore_init_journal
|
|||
{
|
||||
blockstore *bs;
|
||||
int wait_state = 0, wait_count = 0;
|
||||
int entries_loaded = 0;
|
||||
uint8_t *journal_buffer = NULL;
|
||||
uint32_t crc32_last = 0;
|
||||
bool started = false;
|
||||
|
|
|
@ -37,7 +37,9 @@ struct __attribute__((__packed__)) journal_entry_small_write
|
|||
uint64_t version;
|
||||
uint32_t offset;
|
||||
uint32_t len;
|
||||
// small_write entries contain <len> bytes of data, but data is stored in the next journal sector
|
||||
// small_write entries contain <len> bytes of data which is stored in next sectors
|
||||
// data_offset is its offset within journal
|
||||
uint64_t data_offset;
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) journal_entry_big_write
|
||||
|
|
|
@ -132,16 +132,18 @@ int blockstore::dequeue_write(blockstore_operation *op)
|
|||
prefill_single_journal_entry(journal, JE_SMALL_WRITE, sizeof(struct journal_entry_small_write));
|
||||
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
|
||||
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
|
||||
// Figure out where data will be
|
||||
journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512;
|
||||
je->oid = op->oid;
|
||||
je->version = op->version;
|
||||
je->offset = op->offset;
|
||||
je->len = op->len;
|
||||
je->data_offset = journal.next_free;
|
||||
je->crc32 = je_crc32((journal_entry*)je);
|
||||
journal.crc32_last = je->crc32;
|
||||
prepare_journal_sector_write(journal, sqe1, cb);
|
||||
op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector;
|
||||
// Prepare journal data write
|
||||
journal.next_free = (journal.next_free + op->len) < journal.len ? journal.next_free : 512;
|
||||
data2->iov = (struct iovec){ op->buf, op->len };
|
||||
data2->callback = cb;
|
||||
my_uring_prep_writev(
|
||||
|
|
Loading…
Reference in New Issue