Remove hardcode of the EC(2+1) scheme, now it supports EC(k+1), fix some bugs

blocking-uring-test
Vitaliy Filippov 2020-02-13 19:13:16 +03:00
parent b7ccd63104
commit ffe073473a
13 changed files with 183 additions and 95 deletions

View File

@ -37,8 +37,9 @@
/* BS_OP_LIST:
Input:
- len = divisor
- offset = modulo. object is listed if (object_id % len) == offset.
- oid.stripe = parity block size
- len = PG count or 0 to list all objects
- offset = PG number
Output:
- retval = total obj_ver_id count

View File

@ -312,6 +312,10 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
op->callback(op);
return;
}
if (op->opcode == BS_OP_WRITE && !enqueue_write(op))
{
return;
}
// Call constructor without allocating memory. We'll call destructor before returning op back
new ((void*)op->private_data) blockstore_op_private_t;
PRIV(op)->wait_for = 0;
@ -325,22 +329,28 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
{
submit_queue.push_front(op);
}
if (op->opcode == BS_OP_WRITE)
{
enqueue_write(op);
}
ringloop->wakeup();
}
void blockstore_impl_t::process_list(blockstore_op_t *op)
{
// Count objects
uint32_t list_pg = op->offset;
uint32_t pg_count = op->len;
uint64_t parity_block_size = op->oid.stripe;
if (pg_count != 0 && (parity_block_size < MIN_BLOCK_SIZE || list_pg >= pg_count))
{
op->retval = -EINVAL;
FINISH_OP(op);
return;
}
uint64_t stable_count = 0;
if (op->len)
if (pg_count > 0)
{
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
{
if ((it->first % op->len) == op->offset)
uint32_t pg = (it->first.inode + it->first.stripe / parity_block_size) % pg_count;
if (pg == list_pg)
{
stable_count++;
}
@ -353,7 +363,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
uint64_t total_count = stable_count;
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
{
if (!op->len || (it->first.oid % op->len) == op->offset)
if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / parity_block_size) % pg_count) == list_pg)
{
if (IS_STABLE(it->second.state))
{
@ -369,13 +379,14 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
if (!op->buf)
{
op->retval = -ENOMEM;
FINISH_OP(op);
return;
}
obj_ver_id *vers = (obj_ver_id*)op->buf;
int i = 0;
for (auto it = clean_db.begin(); it != clean_db.end(); it++)
{
if (!op->len || (it->first % op->len) == op->offset)
if (!pg_count || ((it->first.inode + it->first.stripe / parity_block_size) % pg_count) == list_pg)
{
vers[i++] = {
.oid = it->first,
@ -386,7 +397,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
int j = stable_count;
for (auto it = dirty_db.begin(); it != dirty_db.end(); it++)
{
if (!op->len || (it->first.oid % op->len) == op->offset)
if (!pg_count || ((it->first.oid.inode + it->first.oid.stripe / parity_block_size) % pg_count) == list_pg)
{
if (IS_STABLE(it->second.state))
{

View File

@ -258,7 +258,7 @@ class blockstore_impl_t
void handle_read_event(ring_data_t *data, blockstore_op_t *op);
// Write
void enqueue_write(blockstore_op_t *op);
bool enqueue_write(blockstore_op_t *op);
int dequeue_write(blockstore_op_t *op);
int dequeue_del(blockstore_op_t *op);
void ack_write(blockstore_op_t *op);

View File

@ -1,6 +1,6 @@
#include "blockstore_impl.h"
void blockstore_impl_t::enqueue_write(blockstore_op_t *op)
bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
{
// Check or assign version number
bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE);
@ -40,14 +40,14 @@ void blockstore_impl_t::enqueue_write(blockstore_op_t *op)
// Invalid version requested
op->retval = -EINVAL;
FINISH_OP(op);
return;
return false;
}
if (deleted && is_del)
{
// Already deleted
op->retval = 0;
FINISH_OP(op);
return;
return false;
}
// Immediately add the operation into dirty_db, so subsequent reads could see it
#ifdef BLOCKSTORE_DEBUG
@ -68,6 +68,7 @@ void blockstore_impl_t::enqueue_write(blockstore_op_t *op)
.len = is_del ? 0 : op->len,
.journal_sector = 0,
});
return true;
}
// First step of the write algorithm: dequeue operation and submit initial write(s)

View File

@ -2,23 +2,15 @@
#include <stdint.h>
// Max 64 replicas
#define STRIPE_MASK 0x3F
#define STRIPE_SHIFT 6
// 16 bytes per object/stripe id
// stripe includes replica number in 6 (or maybe 4, see above) least significant bits
// stripe = (start of the parity stripe + peer role)
// i.e. for example (256KB + one of 0,1,2)
struct __attribute__((__packed__)) object_id
{
uint64_t inode;
uint64_t stripe;
};
inline uint64_t operator % (const object_id & a, const uint64_t b)
{
return ((a.inode % b) * (0x100000000 % b) * (0x100000000 % b) + (a.stripe >> STRIPE_SHIFT) % b) % b;
}
inline bool operator == (const object_id & a, const object_id & b)
{
return a.inode == b.inode && a.stripe == b.stripe;

6
osd.h
View File

@ -10,6 +10,7 @@
#include <arpa/inet.h>
#include <malloc.h>
#include <set>
#include <deque>
#include "blockstore.h"
@ -149,6 +150,9 @@ struct osd_client_t
// Outbound messages (replies or requests)
std::deque<osd_op_t*> outbox;
// PGs dirtied by this client's primary-writes
std::set<pg_num_t> dirty_pgs;
// Write state
osd_op_t *write_op = NULL;
iovec write_iov;
@ -188,6 +192,7 @@ class osd_t
int inflight_ops = 0;
blockstore_t *bs;
uint32_t bs_block_size, bs_disk_alignment;
uint64_t parity_block_size = 4*1024*1024; // 4 MB by default
ring_loop_t *ringloop;
timerfd_interval *tick_tfd;
@ -239,7 +244,6 @@ class osd_t
void exec_primary_read(osd_op_t *cur_op);
void exec_primary_write(osd_op_t *cur_op);
void exec_primary_sync(osd_op_t *cur_op);
void make_primary_reply(osd_op_t *op);
void finish_primary_op(osd_op_t *cur_op, int retval);
void handle_primary_read_subop(osd_op_t *cur_op, int ok);
int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size);

View File

@ -51,15 +51,16 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
}
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST)
{
if (cur_op->op.sec_list.pgtotal < cur_op->op.sec_list.pgnum)
if (cur_op->op.sec_list.pg_count < cur_op->op.sec_list.list_pg)
{
// requested pg number is greater than total pg count
cur_op->bs_op.retval = -EINVAL;
secondary_op_callback(cur_op);
return;
}
cur_op->bs_op.len = cur_op->op.sec_list.pgtotal;
cur_op->bs_op.offset = cur_op->op.sec_list.pgnum - 1;
cur_op->bs_op.oid.stripe = cur_op->op.sec_list.parity_block_size;
cur_op->bs_op.len = cur_op->op.sec_list.pg_count;
cur_op->bs_op.offset = cur_op->op.sec_list.list_pg - 1;
}
#ifdef OSD_STUB
cur_op->bs_op.retval = cur_op->bs_op.len;

View File

@ -131,7 +131,8 @@ struct __attribute__((__packed__)) osd_op_secondary_list_t
{
osd_op_header_t header;
// placement group total number and total count
pg_num_t pgnum, pgtotal;
pg_num_t list_pg, pg_count;
uint64_t parity_block_size;
};
struct __attribute__((__packed__)) osd_reply_secondary_list_t

View File

@ -234,18 +234,18 @@ void osd_t::start_pg_peering(int pg_idx)
if (pg.peering_state)
{
// Adjust the peering operation that's still in progress
for (auto & p: pg.peering_state->list_ops)
for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end(); it++)
{
int role;
for (role = 0; role < pg.cur_set.size(); role++)
{
if (pg.cur_set[role] == p.first)
if (pg.cur_set[role] == it->first)
break;
}
if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size())
{
// Discard the result after completion, which, chances are, will be unsuccessful
auto list_op = p.second;
auto list_op = it->second;
if (list_op->peer_fd == 0)
{
// Self
@ -264,7 +264,8 @@ void osd_t::start_pg_peering(int pg_idx)
delete list_op;
};
}
pg.peering_state->list_ops.erase(p.first);
pg.peering_state->list_ops.erase(it);
it = pg.peering_state->list_ops.begin();
}
}
for (auto & p: pg.peering_state->list_results)
@ -315,6 +316,9 @@ void osd_t::start_pg_peering(int pg_idx)
op->op_type = 0;
op->peer_fd = 0;
op->bs_op.opcode = BS_OP_LIST;
op->bs_op.oid.stripe = parity_block_size;
op->bs_op.len = pg_count,
op->bs_op.offset = pg.pg_num-1,
op->bs_op.callback = [ps, op, role_osd](blockstore_op_t *bs_op)
{
if (op->bs_op.retval < 0)
@ -351,19 +355,19 @@ void osd_t::start_pg_peering(int pg_idx)
.id = this->next_subop_id++,
.opcode = OSD_OP_SECONDARY_LIST,
},
.pgnum = pg.pg_num,
.pgtotal = pg_count,
.list_pg = pg.pg_num,
.pg_count = pg_count,
.parity_block_size = parity_block_size,
},
};
op->callback = [this, ps, role_osd](osd_op_t *op)
{
if (op->reply.hdr.retval < 0)
{
int peer_fd = op->peer_fd;
printf("Failed to get object list from OSD %lu, disconnecting peer\n", role_osd);
delete op;
printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval);
ps->list_ops.erase(role_osd);
stop_client(peer_fd);
stop_client(op->peer_fd);
delete op;
return;
}
printf(

View File

@ -170,7 +170,7 @@ void pg_t::calc_object_states()
for (int i = 0; i < all.size(); i++)
{
if (st.oid.inode != all[i].oid.inode ||
st.oid.stripe != (all[i].oid.stripe >> STRIPE_SHIFT))
st.oid.stripe != (all[i].oid.stripe & ~STRIPE_MASK))
{
if (st.oid.inode != 0)
{
@ -179,7 +179,7 @@ void pg_t::calc_object_states()
remember_object(st, all);
}
st.obj_start = st.ver_start = i;
st.oid = { .inode = all[i].oid.inode, .stripe = all[i].oid.stripe >> STRIPE_SHIFT };
st.oid = { .inode = all[i].oid.inode, .stripe = all[i].oid.stripe & ~STRIPE_MASK };
st.max_ver = st.target_ver = all[i].version;
st.has_roles = st.n_copies = st.n_roles = st.n_stable = st.n_matched = 0;
st.is_buggy = st.has_old_unstable = false;
@ -192,7 +192,7 @@ void pg_t::calc_object_states()
st.ver_end = i;
i++;
while (i < all.size() && st.oid.inode == all[i].oid.inode &&
st.oid.stripe == (all[i].oid.stripe >> STRIPE_SHIFT))
st.oid.stripe == (all[i].oid.stripe & ~STRIPE_MASK))
{
if (!all[i].is_stable)
{
@ -248,7 +248,7 @@ void pg_t::calc_object_states()
pg.state = pg.state | PG_DEGRADED;
}
printf(
"PG %u is active%s%s%s%s\n", pg.pg_num,
"PG %u is active%s%s%s%s%s\n", pg.pg_num,
(pg.state & PG_DEGRADED) ? " + degraded" : "",
(pg.state & PG_HAS_UNFOUND) ? " + has_unfound" : "",
(pg.state & PG_HAS_DEGRADED) ? " + has_degraded" : "",

View File

@ -20,6 +20,9 @@
#define PG_HAS_MISPLACED (1<<7)
#define PG_HAS_UNCLEAN (1<<8)
// FIXME: Safe default that doesn't depend on parity_block_size of pg_parity_size
#define STRIPE_MASK ((uint64_t)4096 - 1)
// OSD object states
#define OBJ_CLEAN 0x01
#define OBJ_MISPLACED 0x02

View File

@ -43,20 +43,46 @@ void osd_t::finish_primary_op(osd_op_t *cur_op, int retval)
outbox_push(this->clients[cur_op->peer_fd], cur_op);
}
inline void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint64_t start, uint64_t end, osd_read_stripe_t *stripes)
{
for (int role = 0; role < pg_minsize; role++)
{
if (start < (1+role)*bs_block_size && end > role*bs_block_size)
{
stripes[role].real_start = stripes[role].start
= start < role*bs_block_size ? 0 : start-role*bs_block_size;
stripes[role].real_end = stripes[role].end
= end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size;
}
}
}
void osd_t::exec_primary_read(osd_op_t *cur_op)
{
object_id oid = {
.inode = cur_op->op.rw.inode,
.stripe = (cur_op->op.rw.offset / (bs_block_size*2)) << STRIPE_SHIFT,
};
// PG number is calculated from the offset
// Our EC scheme stores data in fixed chunks equal to (K*block size)
// But we must not use K in the process of calculating the PG number
// So we calculate the PG number using a separate setting which should be per-inode (FIXME)
uint64_t start = cur_op->op.rw.offset;
uint64_t end = cur_op->op.rw.offset + cur_op->op.rw.len;
pg_num_t pg_num = (oid % pg_count); // FIXME +1
if (((end - 1) / (bs_block_size*2)) != oid.stripe ||
(start % bs_disk_alignment) || (end % bs_disk_alignment) ||
pg_num > pgs.size() ||
// FIXME: Postpone operations in inactive PGs
!(pgs[pg_num].state & PG_ACTIVE))
// FIXME Real pg_num should equal the below expression + 1
pg_num_t pg_num = (cur_op->op.rw.inode + cur_op->op.rw.offset / parity_block_size) % pg_count;
// FIXME: Postpone operations in inactive PGs
if (pg_num > pgs.size() || !(pgs[pg_num].state & PG_ACTIVE))
{
finish_primary_op(cur_op, -EINVAL);
return;
}
uint64_t pg_parity_size = bs_block_size * pgs[pg_num].pg_minsize;
object_id oid = {
.inode = cur_op->op.rw.inode,
// oid.stripe = starting offset of the parity stripe, so it can be mapped back to the PG
.stripe = (cur_op->op.rw.offset / parity_block_size) * parity_block_size +
((cur_op->op.rw.offset % parity_block_size) / pg_parity_size) * pg_parity_size
};
if (end > (oid.stripe + pg_parity_size) ||
(start % bs_disk_alignment) != 0 ||
(end % bs_disk_alignment) != 0)
{
finish_primary_op(cur_op, -EINVAL);
return;
@ -65,18 +91,10 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
sizeof(osd_primary_read_t) + sizeof(osd_read_stripe_t) * pgs[pg_num].pg_size, 1
);
op_data->oid = oid;
osd_read_stripe_t *stripes = (op_data->stripes = ((osd_read_stripe_t*)(op_data+1)));
op_data->stripes = ((osd_read_stripe_t*)(op_data+1));
cur_op->op_data = op_data;
for (int role = 0; role < pgs[pg_num].pg_minsize; role++)
{
if (start < (1+role)*bs_block_size && end > role*bs_block_size)
{
stripes[role].real_start = stripes[role].start
= start < role*bs_block_size ? 0 : start-role*bs_block_size;
stripes[role].end = stripes[role].real_end
= end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size;
}
}
split_stripes(pgs[pg_num].pg_minsize, bs_block_size, start, end, op_data->stripes);
// Determine version
{
auto vo_it = pgs[pg_num].ver_override.find(oid);
op_data->target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX;
@ -95,7 +113,7 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
uint64_t* cur_set = (st_it != pgs[pg_num].obj_states.end()
? st_it->second->read_target.data()
: pgs[pg_num].cur_set.data());
if (extend_missing_stripes(stripes, cur_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0)
if (extend_missing_stripes(op_data->stripes, cur_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0)
{
free(op_data);
finish_primary_op(cur_op, -EIO);
@ -130,18 +148,40 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok)
if (op_data->degraded)
{
// Reconstruct missing stripes
// FIXME: Always EC(k+1) by now. Add different coding schemes
osd_read_stripe_t *stripes = op_data->stripes;
for (int role = 0; role < op_data->pg_minsize; role++)
{
if (stripes[role].end != 0 && stripes[role].real_end == 0)
{
int other = role == 0 ? 1 : 0;
int parity = op_data->pg_size-1;
memxor(
cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[role].start),
cur_op->buf + stripes[parity].pos + (stripes[parity].real_start - stripes[role].start),
cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start
);
int prev = -2;
for (int other = 0; other < op_data->pg_size; other++)
{
if (other != role)
{
if (prev == -2)
{
prev = other;
}
else if (prev >= 0)
{
memxor(
cur_op->buf + stripes[prev].pos + (stripes[prev].real_start - stripes[role].start),
cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[other].start),
cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start
);
prev = -1;
}
else
{
memxor(
cur_op->buf + stripes[role].pos,
cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[role].start),
cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start
);
}
}
}
}
if (stripes[role].end != 0)
{
@ -286,6 +326,31 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op
void osd_t::exec_primary_write(osd_op_t *cur_op)
{
// "RAID5" EC(k+1) parity modification variants (Px = previous, Nx = new):
// 1,2,3 write N1 -> read P2 -> write N3 = N1^P2
// _,2,3 write N1 -> read P2 -> write N3 = N1^P2
// 1,_,3 write N1 -> read P1,P3 -> write N3 = N1^P3^P1
// 1,2,_ write N1 -> read nothing
// 1,2,3,4 write N1 -> read P2,P3 -> write N4 = N1^P2^P3
// (or read P1,P4 -> write N4 = N1^P4^P1)
// 1,_,3,4 write N1 -> read P1,P4 -> write N4 = N1^P4^P1
// _,2,3,4 write N1 -> read P2,P3 -> write N4 = N1^P3^P2
// 1,2,3,4,5 write N1 -> read P1,P5 -> write N5 = N1^P5^P1
// 1,_,3,4,5 write N1 -> read P1,P5 -> write N5 = N1^P5^P1
// _,2,3,4,5 write N1 -> read P2,P3,P4 -> write N5 = N1^P2^P3^P4
//
// I.e, when we write a part:
// 1) If parity is missing and all other parts are available:
// just overwrite the part
// 2) If the modified part is missing and all other parts are available:
// read all other parts except parity, xor them all with the new data
// 3) If all parts are available and size=3:
// read the paired data stripe, xor it with the new data
// 4) Otherwise:
// read old parity and old data of the modified part, xor them both with the new data
// Ouсh. Scary. But faster than the generic variant.
//
// Generic variant for jerasure is a simple RMW process: read all -> decode -> modify -> encode -> write
}
@ -293,10 +358,3 @@ void osd_t::exec_primary_sync(osd_op_t *cur_op)
{
}
void osd_t::make_primary_reply(osd_op_t *op)
{
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
op->reply.hdr.id = op->op.hdr.id;
op->reply.hdr.opcode = op->op.hdr.opcode;
}

View File

@ -94,6 +94,27 @@ int connect_osd(const char *osd_address, int osd_port)
return connect_fd;
}
bool check_reply(int r, osd_any_op_t & op, osd_any_reply_t & reply, int expected)
{
if (r != OSD_PACKET_SIZE)
{
printf("read failed\n");
return false;
}
if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC ||
reply.hdr.id != op.hdr.id || reply.hdr.opcode != op.hdr.opcode)
{
printf("bad reply: magic, id or opcode does not match request\n");
return false;
}
if (reply.hdr.retval != expected)
{
printf("operation failed, retval=%ld\n", reply.hdr.retval);
return false;
}
return true;
}
uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t version, uint64_t pattern)
{
union
@ -116,18 +137,15 @@ uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t ve
op.sec_rw.version = version;
op.sec_rw.offset = 0;
op.sec_rw.len = 128*1024;
void *data = memalign(512, 128*1024);
for (int i = 0; i < 128*1024/sizeof(uint64_t); i++)
void *data = memalign(512, op.sec_rw.len);
for (int i = 0; i < (op.sec_rw.len)/sizeof(uint64_t); i++)
((uint64_t*)data)[i] = pattern;
write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE);
write_blocking(connect_fd, data, 128*1024);
write_blocking(connect_fd, data, op.sec_rw.len);
int r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE);
if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC ||
reply.hdr.id != 1 || reply.hdr.opcode != OSD_OP_SECONDARY_WRITE ||
reply.hdr.retval != 128*1024)
if (!check_reply(r, op, reply, op.sec_rw.len))
{
free(data);
perror("read");
return 0;
}
version = reply.sec_rw.version;
@ -135,12 +153,9 @@ uint64_t test_write(int connect_fd, uint64_t inode, uint64_t stripe, uint64_t ve
op.hdr.id = 2;
write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE);
r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE);
if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC ||
reply.hdr.id != 2 || reply.hdr.opcode != OSD_OP_TEST_SYNC_STAB_ALL ||
reply.hdr.retval != 0)
if (!check_reply(r, op, reply, 0))
{
free(data);
perror("read");
return 0;
}
free(data);
@ -168,12 +183,9 @@ void* test_primary_read(int connect_fd, uint64_t inode, uint64_t offset, uint64_
void *data = memalign(512, len);
write_blocking(connect_fd, op_buf, OSD_PACKET_SIZE);
int r = read_blocking(connect_fd, reply_buf, OSD_PACKET_SIZE);
if (r != OSD_PACKET_SIZE || reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC ||
reply.hdr.id != 1 || reply.hdr.opcode != OSD_OP_READ ||
reply.hdr.retval != len)
if (!check_reply(r, op, reply, len))
{
free(data);
perror("read");
return NULL;
}
r = read_blocking(connect_fd, data, len);