Rename dirty_entry.size to len, remove some unneeded methods and fields, read metadata correctly
parent
1c6b9778a4
commit
bc549553e4
17
blockstore.h
17
blockstore.h
|
@ -114,26 +114,22 @@ inline bool operator < (const object_id & a, const object_id & b)
|
||||||
return a.inode < b.inode || a.inode == b.inode && a.stripe < b.stripe;
|
return a.inode < b.inode || a.inode == b.inode && a.stripe < b.stripe;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 32 bytes per "clean" entry on disk with fixed metadata tables
|
// 24 bytes per "clean" entry on disk with fixed metadata tables
|
||||||
// FIXME: maybe add crc32's to metadata
|
// FIXME: maybe add crc32's to metadata
|
||||||
struct __attribute__((__packed__)) clean_disk_entry
|
struct __attribute__((__packed__)) clean_disk_entry
|
||||||
{
|
{
|
||||||
object_id oid;
|
object_id oid;
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
uint64_t flags;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#define DISK_ENTRY_STABLE 1
|
// 32 = 16 + 16 bytes per "clean" entry in memory (object_id => clean_entry)
|
||||||
|
|
||||||
// 24 bytes per "clean" entry in memory
|
|
||||||
struct __attribute__((__packed__)) clean_entry
|
struct __attribute__((__packed__)) clean_entry
|
||||||
{
|
{
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
uint64_t location;
|
uint64_t location;
|
||||||
uint32_t state;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// 48 bytes per dirty entry in memory
|
// 48 = 24 + 24 bytes per dirty entry in memory (obj_ver_id => dirty_entry)
|
||||||
struct __attribute__((__packed__)) obj_ver_id
|
struct __attribute__((__packed__)) obj_ver_id
|
||||||
{
|
{
|
||||||
object_id oid;
|
object_id oid;
|
||||||
|
@ -148,10 +144,10 @@ inline bool operator < (const obj_ver_id & a, const obj_ver_id & b)
|
||||||
struct __attribute__((__packed__)) dirty_entry
|
struct __attribute__((__packed__)) dirty_entry
|
||||||
{
|
{
|
||||||
uint32_t state;
|
uint32_t state;
|
||||||
uint32_t flags;
|
uint32_t flags; // unneeded, but present for alignment
|
||||||
uint64_t location; // location in either journal or data
|
uint64_t location; // location in either journal or data
|
||||||
uint32_t offset; // offset within stripe
|
uint32_t offset; // offset within stripe
|
||||||
uint32_t size; // entry size. FIXME: rename to len?
|
uint32_t len; // data length
|
||||||
};
|
};
|
||||||
|
|
||||||
class oid_hash
|
class oid_hash
|
||||||
|
@ -274,7 +270,7 @@ class blockstore
|
||||||
|
|
||||||
inline struct io_uring_sqe* get_sqe()
|
inline struct io_uring_sqe* get_sqe()
|
||||||
{
|
{
|
||||||
return ringloop->get_sqe(ring_consumer.number);
|
return ringloop->get_sqe();
|
||||||
}
|
}
|
||||||
|
|
||||||
friend class blockstore_init_meta;
|
friend class blockstore_init_meta;
|
||||||
|
@ -317,7 +313,6 @@ class blockstore
|
||||||
|
|
||||||
// Stabilize
|
// Stabilize
|
||||||
int dequeue_stable(blockstore_operation *op);
|
int dequeue_stable(blockstore_operation *op);
|
||||||
int continue_stable(blockstore_operation *op);
|
|
||||||
void handle_stable_event(ring_data_t *data, blockstore_operation *op);
|
void handle_stable_event(ring_data_t *data, blockstore_operation *op);
|
||||||
void stabilize_object(object_id oid, uint64_t max_ver);
|
void stabilize_object(object_id oid, uint64_t max_ver);
|
||||||
|
|
||||||
|
|
|
@ -67,7 +67,7 @@ void journal_flusher_co::loop()
|
||||||
{
|
{
|
||||||
// First we submit all reads
|
// First we submit all reads
|
||||||
offset = dirty_it->second.offset;
|
offset = dirty_it->second.offset;
|
||||||
len = dirty_it->second.size;
|
len = dirty_it->second.len;
|
||||||
it = v.begin();
|
it = v.begin();
|
||||||
while (1)
|
while (1)
|
||||||
{
|
{
|
||||||
|
@ -161,7 +161,7 @@ void journal_flusher_co::loop()
|
||||||
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
data->iov = (struct iovec){ meta_it->second.buf, 512 };
|
||||||
data->callback = [this](ring_data_t* data)
|
data->callback = [this](ring_data_t* data)
|
||||||
{
|
{
|
||||||
|
meta_it->second.state = 1;
|
||||||
wait_count--;
|
wait_count--;
|
||||||
};
|
};
|
||||||
io_uring_prep_writev(
|
io_uring_prep_writev(
|
||||||
|
@ -208,7 +208,6 @@ void journal_flusher_co::loop()
|
||||||
*((clean_disk_entry*)meta_it->second.buf + meta_pos) = {
|
*((clean_disk_entry*)meta_it->second.buf + meta_pos) = {
|
||||||
.oid = cur.oid,
|
.oid = cur.oid,
|
||||||
.version = cur.version,
|
.version = cur.version,
|
||||||
.flags = DISK_ENTRY_STABLE,
|
|
||||||
};
|
};
|
||||||
resume_6:
|
resume_6:
|
||||||
sqe = bs->get_sqe();
|
sqe = bs->get_sqe();
|
||||||
|
|
|
@ -50,13 +50,14 @@ int blockstore_init_meta::loop()
|
||||||
}
|
}
|
||||||
if (prev_done)
|
if (prev_done)
|
||||||
{
|
{
|
||||||
assert(!(done_len % sizeof(clean_disk_entry)));
|
int count = 512 / sizeof(clean_disk_entry);
|
||||||
int count = done_len / sizeof(clean_disk_entry);
|
for (int sector = 0; sector < done_len; sector += 512)
|
||||||
// FIXME this requires sizeof(clean_disk_entry) to be a divisor of 512
|
{
|
||||||
struct clean_disk_entry *entries = (struct clean_disk_entry*)(metadata_buffer + (prev_done == 1 ? bs->metadata_buf_size : 0));
|
clean_disk_entry *entries = (clean_disk_entry*)(metadata_buffer + (prev_done == 1 ? bs->metadata_buf_size : 0) + sector);
|
||||||
// handle <count> entries
|
// handle <count> entries
|
||||||
handle_entries(entries, count);
|
handle_entries(entries, count, bs->block_order);
|
||||||
done_cnt += count;
|
done_cnt += count;
|
||||||
|
}
|
||||||
prev_done = 0;
|
prev_done = 0;
|
||||||
done_len = 0;
|
done_len = 0;
|
||||||
}
|
}
|
||||||
|
@ -70,7 +71,7 @@ int blockstore_init_meta::loop()
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int count)
|
void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int count, int block_order)
|
||||||
{
|
{
|
||||||
for (unsigned i = 0; i < count; i++)
|
for (unsigned i = 0; i < count; i++)
|
||||||
{
|
{
|
||||||
|
@ -78,9 +79,8 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int
|
||||||
{
|
{
|
||||||
allocator_set(bs->data_alloc, done_cnt+i, true);
|
allocator_set(bs->data_alloc, done_cnt+i, true);
|
||||||
bs->clean_db[entries[i].oid] = (struct clean_entry){
|
bs->clean_db[entries[i].oid] = (struct clean_entry){
|
||||||
entries[i].version,
|
.version = entries[i].version,
|
||||||
(uint32_t)(entries[i].flags & DISK_ENTRY_STABLE ? ST_CURRENT : ST_D_META_SYNCED),
|
.location = (done_cnt+i) << block_order,
|
||||||
done_cnt+i
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
||||||
.flags = 0,
|
.flags = 0,
|
||||||
.location = location,
|
.location = location,
|
||||||
.offset = je->small_write.offset,
|
.offset = je->small_write.offset,
|
||||||
.size = je->small_write.len,
|
.len = je->small_write.len,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else if (je->type == JE_BIG_WRITE)
|
else if (je->type == JE_BIG_WRITE)
|
||||||
|
@ -316,7 +316,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
||||||
.flags = 0,
|
.flags = 0,
|
||||||
.location = je->big_write.location,
|
.location = je->big_write.location,
|
||||||
.offset = 0,
|
.offset = 0,
|
||||||
.size = bs->block_size,
|
.len = bs->block_size,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else if (je->type == JE_STABLE)
|
else if (je->type == JE_STABLE)
|
||||||
|
@ -350,7 +350,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
|
||||||
.flags = 0,
|
.flags = 0,
|
||||||
.location = 0,
|
.location = 0,
|
||||||
.offset = 0,
|
.offset = 0,
|
||||||
.size = 0,
|
.len = 0,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ class blockstore_init_meta
|
||||||
uint8_t *metadata_buffer = NULL;
|
uint8_t *metadata_buffer = NULL;
|
||||||
uint64_t metadata_read = 0;
|
uint64_t metadata_read = 0;
|
||||||
int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0;
|
int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0;
|
||||||
void handle_entries(struct clean_disk_entry* entries, int count);
|
void handle_entries(struct clean_disk_entry* entries, int count, int block_order);
|
||||||
void handle_event(ring_data_t *data);
|
void handle_event(ring_data_t *data);
|
||||||
public:
|
public:
|
||||||
blockstore_init_meta(blockstore *bs);
|
blockstore_init_meta(blockstore *bs);
|
||||||
|
|
|
@ -104,7 +104,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
|
||||||
}
|
}
|
||||||
if (version_ok)
|
if (version_ok)
|
||||||
{
|
{
|
||||||
if (!fulfill_read(read_op, fulfilled, dirty.offset, dirty.offset + dirty.size,
|
if (!fulfill_read(read_op, fulfilled, dirty.offset, dirty.offset + dirty.len,
|
||||||
dirty.state, dirty_it->first.version, dirty.location))
|
dirty.state, dirty_it->first.version, dirty.location))
|
||||||
{
|
{
|
||||||
// need to wait. undo added requests, don't dequeue op
|
// need to wait. undo added requests, don't dequeue op
|
||||||
|
|
|
@ -108,11 +108,6 @@ int blockstore::dequeue_stable(blockstore_operation *op)
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int blockstore::continue_stable(blockstore_operation *op)
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op)
|
void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op)
|
||||||
{
|
{
|
||||||
if (data->res < 0)
|
if (data->res < 0)
|
||||||
|
|
|
@ -33,7 +33,7 @@ void blockstore::enqueue_write(blockstore_operation *op)
|
||||||
.flags = 0,
|
.flags = 0,
|
||||||
.location = 0,
|
.location = 0,
|
||||||
.offset = op->offset,
|
.offset = op->offset,
|
||||||
.size = op->len,
|
.len = op->len,
|
||||||
});
|
});
|
||||||
// Remember write as unsynced here, so external consumers could get
|
// Remember write as unsynced here, so external consumers could get
|
||||||
// the list of dirty objects to sync just before issuing a SYNC request
|
// the list of dirty objects to sync just before issuing a SYNC request
|
||||||
|
|
12
ringloop.cpp
12
ringloop.cpp
|
@ -29,18 +29,6 @@ struct io_uring_sqe* ring_loop_t::get_sqe()
|
||||||
return sqe;
|
return sqe;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct io_uring_sqe* ring_loop_t::get_sqe(int consumer)
|
|
||||||
{
|
|
||||||
struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
|
|
||||||
if (sqe)
|
|
||||||
{
|
|
||||||
struct ring_data_t *data = ring_data + (sqe - ring->sq.sqes);
|
|
||||||
io_uring_sqe_set_data(sqe, data);
|
|
||||||
data->source = consumer;
|
|
||||||
}
|
|
||||||
return sqe;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ring_loop_t::register_consumer(ring_consumer_t & consumer)
|
int ring_loop_t::register_consumer(ring_consumer_t & consumer)
|
||||||
{
|
{
|
||||||
consumer.number = consumers.size();
|
consumer.number = consumers.size();
|
||||||
|
|
|
@ -12,7 +12,6 @@
|
||||||
|
|
||||||
struct ring_data_t
|
struct ring_data_t
|
||||||
{
|
{
|
||||||
uint64_t source;
|
|
||||||
struct iovec iov; // for single-entry read/write operations
|
struct iovec iov; // for single-entry read/write operations
|
||||||
int res;
|
int res;
|
||||||
std::function<void(ring_data_t*)> callback;
|
std::function<void(ring_data_t*)> callback;
|
||||||
|
@ -33,7 +32,6 @@ public:
|
||||||
ring_loop_t(int qd);
|
ring_loop_t(int qd);
|
||||||
~ring_loop_t();
|
~ring_loop_t();
|
||||||
struct io_uring_sqe* get_sqe();
|
struct io_uring_sqe* get_sqe();
|
||||||
struct io_uring_sqe* get_sqe(int consumer);
|
|
||||||
int register_consumer(ring_consumer_t & consumer);
|
int register_consumer(ring_consumer_t & consumer);
|
||||||
void unregister_consumer(int number);
|
void unregister_consumer(int number);
|
||||||
void loop(bool sleep);
|
void loop(bool sleep);
|
||||||
|
|
Loading…
Reference in New Issue