forked from vitalif/vitastor
[Draft] Optimized read
parent
be49998c89
commit
c0f06dea56
|
@ -188,6 +188,187 @@ resume_2:
|
|||
finish_op(cur_op, cur_op->req.rw.len);
|
||||
}
|
||||
|
||||
struct bitmap_request_t
|
||||
{
|
||||
osd_num_t osd_num;
|
||||
object_id oid;
|
||||
uint64_t version;
|
||||
void *bmp_buf;
|
||||
};
|
||||
|
||||
inline bool operator < (const bitmap_request_t & a, const bitmap_request_t & b)
|
||||
{
|
||||
return a.osd_num < b.osd_num || a.osd_num == b.osd_num && a.oid < b.oid;
|
||||
}
|
||||
|
||||
bool osd_t::read_bitmaps()
|
||||
{
|
||||
if (chain.size() > 1)
|
||||
{
|
||||
// First read all bitmaps
|
||||
uint64_t stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size);
|
||||
cur_op->snapshot_bitmaps = malloc_or_die(
|
||||
// bitmaps
|
||||
chain.size() * stripe_count * clean_entry_bitmap_size +
|
||||
// chain itself
|
||||
sizeof(inode_t) * chain.size() +
|
||||
// 'missing' flags for each stripe
|
||||
(pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : chain.size() * pg_size)
|
||||
);
|
||||
inode_t *chain_copy = (inode_t*)((void*)cur_op->snapshot_bitmaps
|
||||
+ chain.size() * stripe_count * clean_entry_bitmap_size);
|
||||
uint8_t *missing_flags = (uint8_t*)((void*)chain_copy
|
||||
+ sizeof(inode_t) * chain.size());
|
||||
memcpy(chain_copy, chain.data(), sizeof(inode_t) * chain.size()); // условно
|
||||
if (pg.state == PG_ACTIVE)
|
||||
{
|
||||
std::vector<bitmap_request_t> bitmap_requests;
|
||||
for (inode_t inode: chain)
|
||||
{
|
||||
object_id cur_oid = { .inode = inode, .stripe = op_data->oid.stripe };
|
||||
auto vo_it = pg.ver_override.find(cur_oid);
|
||||
bitmap_requests.push_back(vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX);
|
||||
}
|
||||
submit_primary_subops(
|
||||
SUBMIT_READ_BITMAPS, target_ver,
|
||||
(op_data->scheme == POOL_SCHEME_REPLICATED ? pg.pg_size : op_data->pg_data_size),
|
||||
pg.cur_set.data(), cur_op
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::vector<bitmap_request_t> bitmap_requests;
|
||||
for (inode_t inode: chain)
|
||||
{
|
||||
object_id cur_oid = { .inode = inode, .stripe = op_data->oid.stripe };
|
||||
auto vo_it = pg.ver_override.find(cur_oid);
|
||||
uint64_t target_version = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
|
||||
uint64_t* cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), NULL);
|
||||
if (op_data->scheme == POOL_SCHEME_REPLICATED)
|
||||
{
|
||||
osd_num_t read_target = 0;
|
||||
for (int i = 0; i < pg.pg_size; i++)
|
||||
{
|
||||
if (cur_set[i] == this->osd_num || cur_set[i] != 0 && read_target == 0)
|
||||
{
|
||||
// Select local or any other available OSD for reading
|
||||
read_target = cur_set[i];
|
||||
}
|
||||
}
|
||||
assert(read_target != 0);
|
||||
bitmap_requests.push_back((bitmap_request_t*){
|
||||
.osd_num = read_target,
|
||||
.oid = cur_oid,
|
||||
.version = target_version,
|
||||
.bmp_buf = cur_op->snapshot_bitmaps + chain_num*clean_entry_bitmap_size,
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
osd_rmw_stripe_t local_stripes[pg.pg_size];
|
||||
memcpy(local_stripes, op_data->stripes, sizeof(osd_rmw_stripe_t) * stripe_count);
|
||||
if (extend_missing_stripes(local_stripes, cur_set, op_data->pg_data_size, pg.pg_size) < 0)
|
||||
{
|
||||
free(snapshot_bitmaps);
|
||||
finish_op(cur_op, -EIO);
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < pg.pg_size; i++)
|
||||
{
|
||||
if (cur_set[i] != 0 && local_stripes[i].read_end != 0)
|
||||
{
|
||||
bitmap_requests.push_back((bitmap_request_t*){
|
||||
.osd_num = cur_set[i],
|
||||
.oid = cur_oid,
|
||||
.version = target_version,
|
||||
.bmp_buf = cur_op->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size,
|
||||
});
|
||||
missing_flags[chain_num*pg_size + i] = 0;
|
||||
}
|
||||
else
|
||||
missing_flags[chain_num*pg_size + i] = 1;
|
||||
}
|
||||
}
|
||||
chain_num++;
|
||||
}
|
||||
std::sort(bitmap_requests.begin(), bitmap_requests.end());
|
||||
op_data->n_subops = 0;
|
||||
for (int i = 0; i < bitmap_requests.size(); i++)
|
||||
{
|
||||
if ((i == bitmap_requests.size()-1 || bitmap_requests[i+1].osd_num != bitmap_requests[i].osd_num) &&
|
||||
bitmap_requests[i].osd_num != this->osd_num)
|
||||
{
|
||||
op_data->n_subops++;
|
||||
}
|
||||
}
|
||||
if (op_data->n_subops)
|
||||
{
|
||||
op_data->fact_ver = 0;
|
||||
op_data->done = op_data->errors = 0;
|
||||
op_data->subops = new osd_op_t[request_count];
|
||||
}
|
||||
for (int i = 0, subop_idx = 0; i < bitmap_requests.size(); i++)
|
||||
{
|
||||
if (i == bitmap_requests.size()-1 || bitmap_requests[i+1].osd_num != bitmap_requests[i].osd_num)
|
||||
{
|
||||
osd_num_t subop_osd_num = bitmap_requests[i].osd_num;
|
||||
if (subop_osd_num == this->osd_num)
|
||||
{
|
||||
// Read synchronously
|
||||
for (int j = prev; j <= i; j++)
|
||||
{
|
||||
bs->read_bitmap(bitmap_requests[i].oid, bitmap_requests[i].version, bitmap_requests[i].bmp_buf, NULL);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Send to a remote OSD
|
||||
osd_op_t *subop = subops+subop_idx;
|
||||
subop->op_type = OSD_OP_OUT;
|
||||
subop->peer_fd = c_cli.osd_peer_fds.at(subop_osd_num);
|
||||
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
|
||||
subop->req.sec_read_bmp = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = c_cli.next_subop_id++,
|
||||
.opcode = OSD_OP_SEC_READ_BMP,
|
||||
},
|
||||
.len = sizeof(obj_ver_id)*(i+1-prev),
|
||||
};
|
||||
obj_ver_id *ov = (obj_ver_id*)subop->buf;
|
||||
for (int j = prev; j <= i; j++, ov++)
|
||||
{
|
||||
ov->oid = bitmap_requests[i].oid;
|
||||
ov->version = bitmap_requests[i].version;
|
||||
}
|
||||
subop->callback = [cur_op, this](osd_op_t *subop)
|
||||
{
|
||||
int requested_count = subop->req.sec_read_bmp.len/sizeof(obj_ver_id);
|
||||
int fail_fd = subop->reply.hdr.retval != requested_count * (8 + clean_entry_bitmap_size) ? subop->peer_fd : -1;
|
||||
handle_primary_bitmap_subop(subop, cur_op);
|
||||
if (fail_fd >= 0)
|
||||
{
|
||||
// read_bitmaps operation failed, drop the connection
|
||||
c_cli.stop_client(fail_fd);
|
||||
}
|
||||
};
|
||||
c_cli.outbox_push(subop);
|
||||
subop_idx++;
|
||||
}
|
||||
prev = i+1;
|
||||
}
|
||||
}
|
||||
if (op_data->n_subops)
|
||||
{
|
||||
// Wait for subops
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
|
|
|
@ -165,8 +165,10 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
|
|||
.stripe = op_data->oid.stripe | stripe_num,
|
||||
},
|
||||
.version = op_version,
|
||||
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
|
||||
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
|
||||
.offset = submit_type == SUBMIT_READ_BITMAPS ? 0
|
||||
: (wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start),
|
||||
.len = submit_type == SUBMIT_READ_BITMAPS ? 0
|
||||
: (wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start),
|
||||
.buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
|
||||
.bitmap = stripes[stripe_num].bmp_buf,
|
||||
});
|
||||
|
@ -196,8 +198,10 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
|
|||
.stripe = op_data->oid.stripe | stripe_num,
|
||||
},
|
||||
.version = op_version,
|
||||
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
|
||||
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
|
||||
.offset = submit_type == SUBMIT_READ_BITMAPS ? 0
|
||||
: (wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start),
|
||||
.len = submit_type == SUBMIT_READ_BITMAPS ? 0
|
||||
: (wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start),
|
||||
.attr_len = wr ? clean_entry_bitmap_size : 0,
|
||||
};
|
||||
#ifdef OSD_DEBUG
|
||||
|
@ -214,7 +218,7 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
|
|||
subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
|
||||
}
|
||||
}
|
||||
else
|
||||
else if (submit_type != SUBMIT_READ_BITMAPS)
|
||||
{
|
||||
if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue