From db66b3916eb93eb705149cf0c8f3e2c5ba4c3dfb Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 13 Nov 2019 14:04:12 +0300 Subject: [PATCH] Begin metadata read-modify-write --- blockstore.h | 9 +++-- blockstore_init.cpp | 3 +- blockstore_stable.cpp | 90 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 95 insertions(+), 7 deletions(-) diff --git a/blockstore.h b/blockstore.h index 70f9ece3..09b71284 100644 --- a/blockstore.h +++ b/blockstore.h @@ -120,16 +120,17 @@ struct __attribute__((__packed__)) clean_disk_entry { object_id oid; uint64_t version; - uint8_t flags; - uint8_t reserved[7]; + uint64_t flags; }; -// 28 bytes per "clean" entry in memory +#define DISK_ENTRY_STABLE 1 + +// 24 bytes per "clean" entry in memory struct __attribute__((__packed__)) clean_entry { uint64_t version; - uint32_t state; uint64_t location; + uint32_t state; }; // 48 bytes per dirty entry in memory diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 7ba57655..08508c2f 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -51,6 +51,7 @@ int blockstore_init_meta::loop() { assert(!(done_len % sizeof(clean_disk_entry))); int count = done_len / sizeof(clean_disk_entry); + // FIXME this requires sizeof(clean_disk_entry) to be a divisor of 512 struct clean_disk_entry *entries = (struct clean_disk_entry*)(metadata_buffer + (prev_done == 1 ? bs->metadata_buf_size : 0)); // handle entries handle_entries(entries, count); @@ -77,7 +78,7 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int allocator_set(bs->data_alloc, done_cnt+i, true); bs->clean_db[entries[i].oid] = (struct clean_entry){ entries[i].version, - (uint32_t)(entries[i].flags ? ST_CURRENT : ST_D_META_SYNCED), + (uint32_t)(entries[i].flags & DISK_ENTRY_STABLE ? ST_CURRENT : ST_D_META_SYNCED), done_cnt+i }; } diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index c42d08da..eabf8f6f 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -162,6 +162,14 @@ struct copy_buffer_t void *buf; }; +struct meta_sector_t +{ + uint64_t offset, len; + int state; + void *buf; + int usage_count; +}; + class journal_flusher_t { blockstore *bs; @@ -173,10 +181,12 @@ class journal_flusher_t std::map::iterator dirty_it; std::vector v; std::vector::iterator it; - uint64_t offset, len, submit_len, clean_loc; + uint64_t offset, len, submit_len, clean_loc, meta_sector, meta_pos; + std::map::iterator meta_it; public: journal_flusher_t(int flush_count); + std::map meta_sectors; std::deque flush_queue; void loop(); }; @@ -191,12 +201,18 @@ void journal_flusher_t::loop() // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ... if (wait_state == 1) goto resume_1; + else if (wait_state == 2) + goto resume_2; else if (wait_state == 3) goto resume_3; else if (wait_state == 4) goto resume_4; else if (wait_state == 5) goto resume_5; + else if (wait_state == 6) + goto resume_6; + else if (wait_state == 7) + goto resume_7; if (!flush_queue.size()) return; cur = flush_queue.front(); @@ -276,6 +292,41 @@ void journal_flusher_t::loop() else clean_loc = clean_it->second.location; } + // Also we need to submit the metadata read. We do a read-modify-write for every operation. + // But we must check if the same sector is already in memory. + // Another option is to keep all raw metadata in memory all the time. Maybe I'll do it sometime... + // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot, + // so I'll avoid it as long as I can. + meta_sector = (clean_loc / (512 / sizeof(clean_disk_entry))) * 512; + meta_pos = (clean_loc % (512 / sizeof(clean_disk_entry))); + meta_it = meta_sectors.find(meta_sector); + if (meta_it == meta_sectors.end()) + { + // Not in memory yet, read it + meta_it = meta_sectors.emplace(meta_sector, (meta_sector_t){ + .offset = meta_sector, + .len = 512, + .state = 0, // 0 = not read yet + .buf = memalign(512, 512), + .usage_count = 1, + }).first; + resume_2: + sqe = bs->get_sqe(); + if (!sqe) + { + wait_state = 2; + return; + } + data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ meta_it->second.buf, 512 }; + data->op = this; + io_uring_prep_writev( + sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector + ); + wait_count++; + } + else + meta_it->second.usage_count++; wait_state = 3; resume_3: // After reads complete we submit writes @@ -299,11 +350,45 @@ void journal_flusher_t::loop() ); wait_count++; } - wait_state = 5; + // And a metadata write resume_5: + if (meta_it->second.state == 0) + { + // metadata sector is still being read, wait for it + wait_state = 5; + return; + } + *((clean_disk_entry*)meta_it->second.buf + meta_pos) = { + .oid = cur.oid, + .version = cur.version, + .flags = DISK_ENTRY_STABLE, + }; + resume_6: + sqe = bs->get_sqe(); + if (!sqe) + { + // Can't submit a write, ring is full + wait_state = 6; + return; + } + data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ meta_it->second.buf, 512 }; + data->op = this; + io_uring_prep_writev( + sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector + ); + wait_count++; + wait_state = 7; + resume_7: // Done, free all buffers if (wait_count == 0) { + meta_it->second.usage_count--; + if (meta_it->second.usage_count == 0) + { + free(meta_it->second.buf); + meta_sectors.erase(meta_it); + } for (it = v.begin(); it != v.end(); it++) { free(it->buf); @@ -311,6 +396,7 @@ void journal_flusher_t::loop() v.clear(); wait_state = 0; } + // FIXME Now sync everything } } }