forked from vitalif/vitastor
Begin metadata read-modify-write
parent
ae77a228c7
commit
db66b3916e
|
@ -120,16 +120,17 @@ struct __attribute__((__packed__)) clean_disk_entry
|
||||||
{
|
{
|
||||||
object_id oid;
|
object_id oid;
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
uint8_t flags;
|
uint64_t flags;
|
||||||
uint8_t reserved[7];
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// 28 bytes per "clean" entry in memory
|
#define DISK_ENTRY_STABLE 1
|
||||||
|
|
||||||
|
// 24 bytes per "clean" entry in memory
|
||||||
struct __attribute__((__packed__)) clean_entry
|
struct __attribute__((__packed__)) clean_entry
|
||||||
{
|
{
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
uint32_t state;
|
|
||||||
uint64_t location;
|
uint64_t location;
|
||||||
|
uint32_t state;
|
||||||
};
|
};
|
||||||
|
|
||||||
// 48 bytes per dirty entry in memory
|
// 48 bytes per dirty entry in memory
|
||||||
|
|
|
@ -51,6 +51,7 @@ int blockstore_init_meta::loop()
|
||||||
{
|
{
|
||||||
assert(!(done_len % sizeof(clean_disk_entry)));
|
assert(!(done_len % sizeof(clean_disk_entry)));
|
||||||
int count = done_len / sizeof(clean_disk_entry);
|
int count = done_len / sizeof(clean_disk_entry);
|
||||||
|
// FIXME this requires sizeof(clean_disk_entry) to be a divisor of 512
|
||||||
struct clean_disk_entry *entries = (struct clean_disk_entry*)(metadata_buffer + (prev_done == 1 ? bs->metadata_buf_size : 0));
|
struct clean_disk_entry *entries = (struct clean_disk_entry*)(metadata_buffer + (prev_done == 1 ? bs->metadata_buf_size : 0));
|
||||||
// handle <count> entries
|
// handle <count> entries
|
||||||
handle_entries(entries, count);
|
handle_entries(entries, count);
|
||||||
|
@ -77,7 +78,7 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int
|
||||||
allocator_set(bs->data_alloc, done_cnt+i, true);
|
allocator_set(bs->data_alloc, done_cnt+i, true);
|
||||||
bs->clean_db[entries[i].oid] = (struct clean_entry){
|
bs->clean_db[entries[i].oid] = (struct clean_entry){
|
||||||
entries[i].version,
|
entries[i].version,
|
||||||
(uint32_t)(entries[i].flags ? ST_CURRENT : ST_D_META_SYNCED),
|
(uint32_t)(entries[i].flags & DISK_ENTRY_STABLE ? ST_CURRENT : ST_D_META_SYNCED),
|
||||||
done_cnt+i
|
done_cnt+i
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,6 +162,14 @@ struct copy_buffer_t
|
||||||
void *buf;
|
void *buf;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct meta_sector_t
|
||||||
|
{
|
||||||
|
uint64_t offset, len;
|
||||||
|
int state;
|
||||||
|
void *buf;
|
||||||
|
int usage_count;
|
||||||
|
};
|
||||||
|
|
||||||
class journal_flusher_t
|
class journal_flusher_t
|
||||||
{
|
{
|
||||||
blockstore *bs;
|
blockstore *bs;
|
||||||
|
@ -173,10 +181,12 @@ class journal_flusher_t
|
||||||
std::map<obj_ver_id, dirty_entry>::iterator dirty_it;
|
std::map<obj_ver_id, dirty_entry>::iterator dirty_it;
|
||||||
std::vector<copy_buffer_t> v;
|
std::vector<copy_buffer_t> v;
|
||||||
std::vector<copy_buffer_t>::iterator it;
|
std::vector<copy_buffer_t>::iterator it;
|
||||||
uint64_t offset, len, submit_len, clean_loc;
|
uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos;
|
||||||
|
std::map<uint64_t, meta_sector_t>::iterator meta_it;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
journal_flusher_t(int flush_count);
|
journal_flusher_t(int flush_count);
|
||||||
|
std::map<uint64_t, meta_sector_t> meta_sectors;
|
||||||
std::deque<obj_ver_id> flush_queue;
|
std::deque<obj_ver_id> flush_queue;
|
||||||
void loop();
|
void loop();
|
||||||
};
|
};
|
||||||
|
@ -191,12 +201,18 @@ void journal_flusher_t::loop()
|
||||||
// Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
|
// Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
|
||||||
if (wait_state == 1)
|
if (wait_state == 1)
|
||||||
goto resume_1;
|
goto resume_1;
|
||||||
|
else if (wait_state == 2)
|
||||||
|
goto resume_2;
|
||||||
else if (wait_state == 3)
|
else if (wait_state == 3)
|
||||||
goto resume_3;
|
goto resume_3;
|
||||||
else if (wait_state == 4)
|
else if (wait_state == 4)
|
||||||
goto resume_4;
|
goto resume_4;
|
||||||
else if (wait_state == 5)
|
else if (wait_state == 5)
|
||||||
goto resume_5;
|
goto resume_5;
|
||||||
|
else if (wait_state == 6)
|
||||||
|
goto resume_6;
|
||||||
|
else if (wait_state == 7)
|
||||||
|
goto resume_7;
|
||||||
if (!flush_queue.size())
|
if (!flush_queue.size())
|
||||||
return;
|
return;
|
||||||
cur = flush_queue.front();
|
cur = flush_queue.front();
|
||||||
|
@ -276,6 +292,41 @@ void journal_flusher_t::loop()
|
||||||
else
|
else
|
||||||
clean_loc = clean_it->second.location;
|
clean_loc = clean_it->second.location;
|
||||||
}
|
}
|
||||||
|
// Also we need to submit the metadata read. We do a read-modify-write for every operation.
|
||||||
|
// But we must check if the same sector is already in memory.
|
||||||
|
// Another option is to keep all raw metadata in memory all the time. Maybe I'll do it sometime...
|
||||||
|
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
|
||||||
|
// so I'll avoid it as long as I can.
|
||||||
|
meta_sector = (clean_loc / (512 / sizeof(clean_disk_entry))) * 512;
|
||||||
|
meta_pos = (clean_loc % (512 / sizeof(clean_disk_entry)));
|
||||||
|
meta_it = meta_sectors.find(meta_sector);
|
||||||
|
if (meta_it == meta_sectors.end())
|
||||||
|
{
|
||||||
|
// Not in memory yet, read it
|
||||||
|
meta_it = meta_sectors.emplace(meta_sector, (meta_sector_t){
|
||||||
|
.offset = meta_sector,
|
||||||
|
.len = 512,
|
||||||
|
.state = 0, // 0 = not read yet
|
||||||
|
.buf = memalign(512, 512),
|
||||||
|
.usage_count = 1,
|
||||||
|
}).first;
|
||||||
|
resume_2:
|
||||||
|
sqe = bs->get_sqe();
|
||||||
|
if (!sqe)
|
||||||
|
{
|
||||||
|
wait_state = 2;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
data = ((ring_data_t*)sqe->user_data);
|
||||||
|
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
||||||
|
data->op = this;
|
||||||
|
io_uring_prep_writev(
|
||||||
|
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
||||||
|
);
|
||||||
|
wait_count++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
meta_it->second.usage_count++;
|
||||||
wait_state = 3;
|
wait_state = 3;
|
||||||
resume_3:
|
resume_3:
|
||||||
// After reads complete we submit writes
|
// After reads complete we submit writes
|
||||||
|
@ -299,11 +350,45 @@ void journal_flusher_t::loop()
|
||||||
);
|
);
|
||||||
wait_count++;
|
wait_count++;
|
||||||
}
|
}
|
||||||
wait_state = 5;
|
// And a metadata write
|
||||||
resume_5:
|
resume_5:
|
||||||
|
if (meta_it->second.state == 0)
|
||||||
|
{
|
||||||
|
// metadata sector is still being read, wait for it
|
||||||
|
wait_state = 5;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
*((clean_disk_entry*)meta_it->second.buf + meta_pos) = {
|
||||||
|
.oid = cur.oid,
|
||||||
|
.version = cur.version,
|
||||||
|
.flags = DISK_ENTRY_STABLE,
|
||||||
|
};
|
||||||
|
resume_6:
|
||||||
|
sqe = bs->get_sqe();
|
||||||
|
if (!sqe)
|
||||||
|
{
|
||||||
|
// Can't submit a write, ring is full
|
||||||
|
wait_state = 6;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
data = ((ring_data_t*)sqe->user_data);
|
||||||
|
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
||||||
|
data->op = this;
|
||||||
|
io_uring_prep_writev(
|
||||||
|
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector
|
||||||
|
);
|
||||||
|
wait_count++;
|
||||||
|
wait_state = 7;
|
||||||
|
resume_7:
|
||||||
// Done, free all buffers
|
// Done, free all buffers
|
||||||
if (wait_count == 0)
|
if (wait_count == 0)
|
||||||
{
|
{
|
||||||
|
meta_it->second.usage_count--;
|
||||||
|
if (meta_it->second.usage_count == 0)
|
||||||
|
{
|
||||||
|
free(meta_it->second.buf);
|
||||||
|
meta_sectors.erase(meta_it);
|
||||||
|
}
|
||||||
for (it = v.begin(); it != v.end(); it++)
|
for (it = v.begin(); it != v.end(); it++)
|
||||||
{
|
{
|
||||||
free(it->buf);
|
free(it->buf);
|
||||||
|
@ -311,6 +396,7 @@ void journal_flusher_t::loop()
|
||||||
v.clear();
|
v.clear();
|
||||||
wait_state = 0;
|
wait_state = 0;
|
||||||
}
|
}
|
||||||
|
// FIXME Now sync everything
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue