Compare commits

...

5 Commits

15 changed files with 324 additions and 33 deletions

View File

@ -38,6 +38,11 @@ void blockstore_t::enqueue_op(blockstore_op_t *op)
impl->enqueue_op(op);
}
int blockstore_t::read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version)
{
return impl->read_bitmap(oid, target_version, bitmap, result_version);
}
std::unordered_map<object_id, uint64_t> & blockstore_t::get_unstable_writes()
{
return impl->unstable_writes;

View File

@ -179,6 +179,9 @@ public:
// Submission
void enqueue_op(blockstore_op_t *op);
// Simplified synchronous operation: get object bitmap & current version
int read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version = NULL);
// Unstable writes are added here (map of object_id -> version)
std::unordered_map<object_id, uint64_t> & get_unstable_writes();

View File

@ -443,7 +443,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
}
for (; clean_it != clean_end; clean_it++)
{
if (!pg_count || ((clean_it->first.inode + clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg)
if (!pg_count || ((clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
{
if (stable_count >= stable_alloc)
{
@ -488,7 +488,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
}
for (; dirty_it != dirty_end; dirty_it++)
{
if (!pg_count || ((dirty_it->first.oid.inode + dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg)
if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
{
if (IS_DELETE(dirty_it->second.state))
{

View File

@ -323,6 +323,9 @@ public:
// Submission
void enqueue_op(blockstore_op_t *op);
// Simplified synchronous operation: get object bitmap & current version
int read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version = NULL);
// Unstable writes are added here (map of object_id -> version)
std::unordered_map<object_id, uint64_t> unstable_writes;

View File

@ -268,3 +268,50 @@ void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op
FINISH_OP(op);
}
}
int blockstore_impl_t::read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version)
{
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
.oid = oid,
.version = UINT64_MAX,
});
if (dirty_it != dirty_db.begin())
dirty_it--;
if (dirty_it != dirty_db.end())
{
while (dirty_it->first.oid == oid)
{
if (target_version >= dirty_it->first.version)
{
if (result_version)
*result_version = dirty_it->first.version;
if (bitmap)
{
void *bmp_ptr = (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap);
memcpy(bitmap, bmp_ptr, clean_entry_bitmap_size);
}
return 0;
}
if (dirty_it == dirty_db.begin())
break;
dirty_it--;
}
}
auto clean_it = clean_db.find(oid);
if (clean_it != clean_db.end())
{
if (result_version)
*result_version = clean_it->second.version;
if (bitmap)
{
void *bmp_ptr = get_clean_entry_bitmap(clean_it->second.location, clean_entry_bitmap_size);
memcpy(bitmap, bmp_ptr, clean_entry_bitmap_size);
}
return 0;
}
if (result_version)
*result_version = 0;
if (bitmap)
memset(bitmap, 0, clean_entry_bitmap_size);
return -ENOENT;
}

View File

@ -582,7 +582,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
int i = 0;
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
{
pg_num_t pg_num = (op->cur_inode + stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
uint64_t begin = (op->offset < stripe ? stripe : op->offset);
uint64_t end = (op->offset + op->len) > (stripe + pg_block_size)
? (stripe + pg_block_size) : (op->offset + op->len);

View File

@ -232,6 +232,15 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
}
cl->read_remaining = cur_op->req.sec_stab.len;
}
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
if (cur_op->req.sec_read_bmp.len > 0)
{
cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_read_bmp.len);
cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_read_bmp.len);
}
cl->read_remaining = cur_op->req.sec_read_bmp.len;
}
else if (cur_op->req.hdr.opcode == OSD_OP_READ)
{
cl->read_remaining = 0;

View File

@ -73,6 +73,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
? (cur_op->req.hdr.opcode == OSD_OP_READ ||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cur_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP ||
cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
: (cur_op->req.hdr.opcode == OSD_OP_WRITE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||

View File

@ -228,6 +228,7 @@ void osd_t::exec_op(osd_op_t *cur_op)
cur_op->req.hdr.opcode != OSD_OP_SEC_READ &&
cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
cur_op->req.hdr.opcode != OSD_OP_READ &&
cur_op->req.hdr.opcode != OSD_OP_SEC_READ_BMP &&
cur_op->req.hdr.opcode != OSD_OP_SHOW_CONFIG)
{
// Readonly mode

View File

@ -226,7 +226,7 @@ class osd_t
uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)];
if (!pg_count)
pg_count = 1;
return (oid.inode + oid.stripe / pg_stripe_size) % pg_count + 1;
return (oid.stripe / pg_stripe_size) % pg_count + 1;
}
public:

View File

@ -20,4 +20,5 @@ const char* osd_op_names[] = {
"primary_sync",
"primary_delete",
"ping",
"sec_read_bmp",
};

View File

@ -28,7 +28,8 @@
#define OSD_OP_SYNC 13
#define OSD_OP_DELETE 14
#define OSD_OP_PING 15
#define OSD_OP_MAX 15
#define OSD_OP_SEC_READ_BMP 16
#define OSD_OP_MAX 16
// Alignment & limit for read/write operations
#ifndef MEM_ALIGNMENT
#define MEM_ALIGNMENT 512
@ -59,7 +60,7 @@ struct __attribute__((__packed__)) osd_reply_header_t
};
// read or write to the secondary OSD
struct __attribute__((__packed__)) osd_op_secondary_rw_t
struct __attribute__((__packed__)) osd_op_sec_rw_t
{
osd_op_header_t header;
// object
@ -76,7 +77,7 @@ struct __attribute__((__packed__)) osd_op_secondary_rw_t
uint32_t pad0;
};
struct __attribute__((__packed__)) osd_reply_secondary_rw_t
struct __attribute__((__packed__)) osd_reply_sec_rw_t
{
osd_reply_header_t header;
// for reads and writes: assigned or read version number
@ -87,7 +88,7 @@ struct __attribute__((__packed__)) osd_reply_secondary_rw_t
};
// delete object on the secondary OSD
struct __attribute__((__packed__)) osd_op_secondary_del_t
struct __attribute__((__packed__)) osd_op_sec_del_t
{
osd_op_header_t header;
// object
@ -96,37 +97,51 @@ struct __attribute__((__packed__)) osd_op_secondary_del_t
uint64_t version;
};
struct __attribute__((__packed__)) osd_reply_secondary_del_t
struct __attribute__((__packed__)) osd_reply_sec_del_t
{
osd_reply_header_t header;
uint64_t version;
};
// sync to the secondary OSD
struct __attribute__((__packed__)) osd_op_secondary_sync_t
struct __attribute__((__packed__)) osd_op_sec_sync_t
{
osd_op_header_t header;
};
struct __attribute__((__packed__)) osd_reply_secondary_sync_t
struct __attribute__((__packed__)) osd_reply_sec_sync_t
{
osd_reply_header_t header;
};
// stabilize or rollback objects on the secondary OSD
struct __attribute__((__packed__)) osd_op_secondary_stabilize_t
struct __attribute__((__packed__)) osd_op_sec_stab_t
{
osd_op_header_t header;
// obj_ver_id array length in bytes
uint64_t len;
};
typedef osd_op_secondary_stabilize_t osd_op_secondary_rollback_t;
typedef osd_op_sec_stab_t osd_op_sec_rollback_t;
struct __attribute__((__packed__)) osd_reply_secondary_stabilize_t
struct __attribute__((__packed__)) osd_reply_sec_stab_t
{
osd_reply_header_t header;
};
typedef osd_reply_secondary_stabilize_t osd_reply_secondary_rollback_t;
typedef osd_reply_sec_stab_t osd_reply_sec_rollback_t;
// bulk read bitmaps from a secondary OSD
struct __attribute__((__packed__)) osd_op_sec_read_bmp_t
{
osd_op_header_t header;
// obj_ver_id array length in bytes
uint64_t len;
};
struct __attribute__((__packed__)) osd_reply_sec_read_bmp_t
{
// retval is payload length in bytes. payload is {version,bitmap}[]
osd_reply_header_t header;
};
// show configuration
struct __attribute__((__packed__)) osd_op_show_config_t
@ -140,7 +155,7 @@ struct __attribute__((__packed__)) osd_reply_show_config_t
};
// list objects on replica
struct __attribute__((__packed__)) osd_op_secondary_list_t
struct __attribute__((__packed__)) osd_op_sec_list_t
{
osd_op_header_t header;
// placement group total number and total count
@ -151,7 +166,7 @@ struct __attribute__((__packed__)) osd_op_secondary_list_t
uint64_t min_inode, max_inode;
};
struct __attribute__((__packed__)) osd_reply_secondary_list_t
struct __attribute__((__packed__)) osd_reply_sec_list_t
{
osd_reply_header_t header;
// stable object version count. header.retval = total object version count
@ -194,11 +209,12 @@ struct __attribute__((__packed__)) osd_reply_sync_t
union osd_any_op_t
{
osd_op_header_t hdr;
osd_op_secondary_rw_t sec_rw;
osd_op_secondary_del_t sec_del;
osd_op_secondary_sync_t sec_sync;
osd_op_secondary_stabilize_t sec_stab;
osd_op_secondary_list_t sec_list;
osd_op_sec_rw_t sec_rw;
osd_op_sec_del_t sec_del;
osd_op_sec_sync_t sec_sync;
osd_op_sec_stab_t sec_stab;
osd_op_sec_read_bmp_t sec_read_bmp;
osd_op_sec_list_t sec_list;
osd_op_show_config_t show_conf;
osd_op_rw_t rw;
osd_op_sync_t sync;
@ -208,11 +224,12 @@ union osd_any_op_t
union osd_any_reply_t
{
osd_reply_header_t hdr;
osd_reply_secondary_rw_t sec_rw;
osd_reply_secondary_del_t sec_del;
osd_reply_secondary_sync_t sec_sync;
osd_reply_secondary_stabilize_t sec_stab;
osd_reply_secondary_list_t sec_list;
osd_reply_sec_rw_t sec_rw;
osd_reply_sec_del_t sec_del;
osd_reply_sec_sync_t sec_sync;
osd_reply_sec_stab_t sec_stab;
osd_reply_sec_read_bmp_t sec_read_bmp;
osd_reply_sec_list_t sec_list;
osd_reply_show_config_t show_conf;
osd_reply_rw_t rw;
osd_reply_sync_t sync;

View File

@ -35,7 +35,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
// oid.stripe = starting offset of the parity stripe
.stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size,
};
pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pool_cfg.pg_stripe_size) % pg_counts[pool_id] + 1;
pg_num_t pg_num = (oid.stripe/pool_cfg.pg_stripe_size) % pg_counts[pool_id] + 1; // like map_to_pg()
auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
{
@ -188,6 +188,187 @@ resume_2:
finish_op(cur_op, cur_op->req.rw.len);
}
struct bitmap_request_t
{
osd_num_t osd_num;
object_id oid;
uint64_t version;
void *bmp_buf;
};
inline bool operator < (const bitmap_request_t & a, const bitmap_request_t & b)
{
return a.osd_num < b.osd_num || a.osd_num == b.osd_num && a.oid < b.oid;
}
bool osd_t::read_bitmaps()
{
if (chain.size() > 1)
{
// First read all bitmaps
uint64_t stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size);
cur_op->snapshot_bitmaps = malloc_or_die(
// bitmaps
chain.size() * stripe_count * clean_entry_bitmap_size +
// chain itself
sizeof(inode_t) * chain.size() +
// 'missing' flags for each stripe
(pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : chain.size() * pg_size)
);
inode_t *chain_copy = (inode_t*)((void*)cur_op->snapshot_bitmaps
+ chain.size() * stripe_count * clean_entry_bitmap_size);
uint8_t *missing_flags = (uint8_t*)((void*)chain_copy
+ sizeof(inode_t) * chain.size());
memcpy(chain_copy, chain.data(), sizeof(inode_t) * chain.size()); // условно
if (pg.state == PG_ACTIVE)
{
std::vector<bitmap_request_t> bitmap_requests;
for (inode_t inode: chain)
{
object_id cur_oid = { .inode = inode, .stripe = op_data->oid.stripe };
auto vo_it = pg.ver_override.find(cur_oid);
bitmap_requests.push_back(vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX);
}
submit_primary_subops(
SUBMIT_READ_BITMAPS, target_ver,
(op_data->scheme == POOL_SCHEME_REPLICATED ? pg.pg_size : op_data->pg_data_size),
pg.cur_set.data(), cur_op
);
}
else
{
std::vector<bitmap_request_t> bitmap_requests;
for (inode_t inode: chain)
{
object_id cur_oid = { .inode = inode, .stripe = op_data->oid.stripe };
auto vo_it = pg.ver_override.find(cur_oid);
uint64_t target_version = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
uint64_t* cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), NULL);
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
osd_num_t read_target = 0;
for (int i = 0; i < pg.pg_size; i++)
{
if (cur_set[i] == this->osd_num || cur_set[i] != 0 && read_target == 0)
{
// Select local or any other available OSD for reading
read_target = cur_set[i];
}
}
assert(read_target != 0);
bitmap_requests.push_back((bitmap_request_t*){
.osd_num = read_target,
.oid = cur_oid,
.version = target_version,
.bmp_buf = cur_op->snapshot_bitmaps + chain_num*clean_entry_bitmap_size,
});
}
else
{
osd_rmw_stripe_t local_stripes[pg.pg_size];
memcpy(local_stripes, op_data->stripes, sizeof(osd_rmw_stripe_t) * stripe_count);
if (extend_missing_stripes(local_stripes, cur_set, op_data->pg_data_size, pg.pg_size) < 0)
{
free(snapshot_bitmaps);
finish_op(cur_op, -EIO);
return;
}
for (int i = 0; i < pg.pg_size; i++)
{
if (cur_set[i] != 0 && local_stripes[i].read_end != 0)
{
bitmap_requests.push_back((bitmap_request_t*){
.osd_num = cur_set[i],
.oid = cur_oid,
.version = target_version,
.bmp_buf = cur_op->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size,
});
missing_flags[chain_num*pg_size + i] = 0;
}
else
missing_flags[chain_num*pg_size + i] = 1;
}
}
chain_num++;
}
std::sort(bitmap_requests.begin(), bitmap_requests.end());
op_data->n_subops = 0;
for (int i = 0; i < bitmap_requests.size(); i++)
{
if ((i == bitmap_requests.size()-1 || bitmap_requests[i+1].osd_num != bitmap_requests[i].osd_num) &&
bitmap_requests[i].osd_num != this->osd_num)
{
op_data->n_subops++;
}
}
if (op_data->n_subops)
{
op_data->fact_ver = 0;
op_data->done = op_data->errors = 0;
op_data->subops = new osd_op_t[request_count];
}
for (int i = 0, subop_idx = 0; i < bitmap_requests.size(); i++)
{
if (i == bitmap_requests.size()-1 || bitmap_requests[i+1].osd_num != bitmap_requests[i].osd_num)
{
osd_num_t subop_osd_num = bitmap_requests[i].osd_num;
if (subop_osd_num == this->osd_num)
{
// Read synchronously
for (int j = prev; j <= i; j++)
{
bs->read_bitmap(bitmap_requests[i].oid, bitmap_requests[i].version, bitmap_requests[i].bmp_buf, NULL);
}
}
else
{
// Send to a remote OSD
osd_op_t *subop = subops+subop_idx;
subop->op_type = OSD_OP_OUT;
subop->peer_fd = c_cli.osd_peer_fds.at(subop_osd_num);
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
subop->req.sec_read_bmp = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.opcode = OSD_OP_SEC_READ_BMP,
},
.len = sizeof(obj_ver_id)*(i+1-prev),
};
obj_ver_id *ov = (obj_ver_id*)subop->buf;
for (int j = prev; j <= i; j++, ov++)
{
ov->oid = bitmap_requests[i].oid;
ov->version = bitmap_requests[i].version;
}
subop->callback = [cur_op, this](osd_op_t *subop)
{
int requested_count = subop->req.sec_read_bmp.len/sizeof(obj_ver_id);
int fail_fd = subop->reply.hdr.retval != requested_count * (8 + clean_entry_bitmap_size) ? subop->peer_fd : -1;
handle_primary_bitmap_subop(subop, cur_op);
if (fail_fd >= 0)
{
// read_bitmaps operation failed, drop the connection
c_cli.stop_client(fail_fd);
}
};
c_cli.outbox_push(subop);
subop_idx++;
}
prev = i+1;
}
}
if (op_data->n_subops)
{
// Wait for subops
return;
}
}
}
}
bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
{
osd_primary_op_data_t *op_data = cur_op->op_data;

View File

@ -165,8 +165,10 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
.stripe = op_data->oid.stripe | stripe_num,
},
.version = op_version,
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
.offset = submit_type == SUBMIT_READ_BITMAPS ? 0
: (wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start),
.len = submit_type == SUBMIT_READ_BITMAPS ? 0
: (wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start),
.buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
.bitmap = stripes[stripe_num].bmp_buf,
});
@ -196,8 +198,10 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
.stripe = op_data->oid.stripe | stripe_num,
},
.version = op_version,
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
.offset = submit_type == SUBMIT_READ_BITMAPS ? 0
: (wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start),
.len = submit_type == SUBMIT_READ_BITMAPS ? 0
: (wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start),
.attr_len = wr ? clean_entry_bitmap_size : 0,
};
#ifdef OSD_DEBUG
@ -214,7 +218,7 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
}
}
else
else if (submit_type != SUBMIT_READ_BITMAPS)
{
if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
{

View File

@ -44,6 +44,25 @@ void osd_t::secondary_op_callback(osd_op_t *op)
void osd_t::exec_secondary(osd_op_t *cur_op)
{
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
int n = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id);
if (n > 0)
{
obj_ver_id *ov = (obj_ver_id*)cur_op->buf;
void *reply_buf = malloc_or_die(n * (8 + clean_entry_bitmap_size));
void *cur_buf = reply_buf;
for (int i = 0; i < n; i++)
{
bs->read_bitmap(ov[i].oid, ov[i].version, cur_buf + sizeof(uint64_t), (uint64_t*)cur_buf);
cur_buf += (8 + clean_entry_bitmap_size);
}
free(cur_op->buf);
cur_op->iov.push_back(cur_op->buf, cur_op->bs_op->retval * sizeof(obj_ver_id));
}
finish_op(cur_op, n * (8 + clean_entry_bitmap_size));
return;
}
cur_op->bs_op = new blockstore_op_t();
cur_op->bs_op->callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
cur_op->bs_op->opcode = (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ? BS_OP_READ