forked from vitalif/vitastor
Compare commits
5 Commits
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | c0f06dea56 | |
Vitaliy Filippov | be49998c89 | |
Vitaliy Filippov | 15636dd3a2 | |
Vitaliy Filippov | f78a959544 | |
Vitaliy Filippov | ac6dba8ddc |
|
@ -38,6 +38,11 @@ void blockstore_t::enqueue_op(blockstore_op_t *op)
|
|||
impl->enqueue_op(op);
|
||||
}
|
||||
|
||||
int blockstore_t::read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version)
|
||||
{
|
||||
return impl->read_bitmap(oid, target_version, bitmap, result_version);
|
||||
}
|
||||
|
||||
std::unordered_map<object_id, uint64_t> & blockstore_t::get_unstable_writes()
|
||||
{
|
||||
return impl->unstable_writes;
|
||||
|
|
|
@ -179,6 +179,9 @@ public:
|
|||
// Submission
|
||||
void enqueue_op(blockstore_op_t *op);
|
||||
|
||||
// Simplified synchronous operation: get object bitmap & current version
|
||||
int read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version = NULL);
|
||||
|
||||
// Unstable writes are added here (map of object_id -> version)
|
||||
std::unordered_map<object_id, uint64_t> & get_unstable_writes();
|
||||
|
||||
|
|
|
@ -443,7 +443,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
|||
}
|
||||
for (; clean_it != clean_end; clean_it++)
|
||||
{
|
||||
if (!pg_count || ((clean_it->first.inode + clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||
if (!pg_count || ((clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
|
||||
{
|
||||
if (stable_count >= stable_alloc)
|
||||
{
|
||||
|
@ -488,7 +488,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
|||
}
|
||||
for (; dirty_it != dirty_end; dirty_it++)
|
||||
{
|
||||
if (!pg_count || ((dirty_it->first.oid.inode + dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||
if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
|
||||
{
|
||||
if (IS_DELETE(dirty_it->second.state))
|
||||
{
|
||||
|
|
|
@ -323,6 +323,9 @@ public:
|
|||
// Submission
|
||||
void enqueue_op(blockstore_op_t *op);
|
||||
|
||||
// Simplified synchronous operation: get object bitmap & current version
|
||||
int read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version = NULL);
|
||||
|
||||
// Unstable writes are added here (map of object_id -> version)
|
||||
std::unordered_map<object_id, uint64_t> unstable_writes;
|
||||
|
||||
|
|
|
@ -268,3 +268,50 @@ void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op
|
|||
FINISH_OP(op);
|
||||
}
|
||||
}
|
||||
|
||||
int blockstore_impl_t::read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version)
|
||||
{
|
||||
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
|
||||
.oid = oid,
|
||||
.version = UINT64_MAX,
|
||||
});
|
||||
if (dirty_it != dirty_db.begin())
|
||||
dirty_it--;
|
||||
if (dirty_it != dirty_db.end())
|
||||
{
|
||||
while (dirty_it->first.oid == oid)
|
||||
{
|
||||
if (target_version >= dirty_it->first.version)
|
||||
{
|
||||
if (result_version)
|
||||
*result_version = dirty_it->first.version;
|
||||
if (bitmap)
|
||||
{
|
||||
void *bmp_ptr = (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap);
|
||||
memcpy(bitmap, bmp_ptr, clean_entry_bitmap_size);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
if (dirty_it == dirty_db.begin())
|
||||
break;
|
||||
dirty_it--;
|
||||
}
|
||||
}
|
||||
auto clean_it = clean_db.find(oid);
|
||||
if (clean_it != clean_db.end())
|
||||
{
|
||||
if (result_version)
|
||||
*result_version = clean_it->second.version;
|
||||
if (bitmap)
|
||||
{
|
||||
void *bmp_ptr = get_clean_entry_bitmap(clean_it->second.location, clean_entry_bitmap_size);
|
||||
memcpy(bitmap, bmp_ptr, clean_entry_bitmap_size);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
if (result_version)
|
||||
*result_version = 0;
|
||||
if (bitmap)
|
||||
memset(bitmap, 0, clean_entry_bitmap_size);
|
||||
return -ENOENT;
|
||||
}
|
||||
|
|
|
@ -582,7 +582,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
|
|||
int i = 0;
|
||||
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
|
||||
{
|
||||
pg_num_t pg_num = (op->cur_inode + stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
|
||||
pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
|
||||
uint64_t begin = (op->offset < stripe ? stripe : op->offset);
|
||||
uint64_t end = (op->offset + op->len) > (stripe + pg_block_size)
|
||||
? (stripe + pg_block_size) : (op->offset + op->len);
|
||||
|
|
|
@ -232,6 +232,15 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
|||
}
|
||||
cl->read_remaining = cur_op->req.sec_stab.len;
|
||||
}
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
||||
{
|
||||
if (cur_op->req.sec_read_bmp.len > 0)
|
||||
{
|
||||
cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_read_bmp.len);
|
||||
cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_read_bmp.len);
|
||||
}
|
||||
cl->read_remaining = cur_op->req.sec_read_bmp.len;
|
||||
}
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
cl->read_remaining = 0;
|
||||
|
|
|
@ -73,6 +73,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
|||
? (cur_op->req.hdr.opcode == OSD_OP_READ ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
|
||||
: (cur_op->req.hdr.opcode == OSD_OP_WRITE ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||
|
|
|
@ -228,6 +228,7 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
|||
cur_op->req.hdr.opcode != OSD_OP_SEC_READ &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_READ &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_SEC_READ_BMP &&
|
||||
cur_op->req.hdr.opcode != OSD_OP_SHOW_CONFIG)
|
||||
{
|
||||
// Readonly mode
|
||||
|
|
|
@ -226,7 +226,7 @@ class osd_t
|
|||
uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)];
|
||||
if (!pg_count)
|
||||
pg_count = 1;
|
||||
return (oid.inode + oid.stripe / pg_stripe_size) % pg_count + 1;
|
||||
return (oid.stripe / pg_stripe_size) % pg_count + 1;
|
||||
}
|
||||
|
||||
public:
|
||||
|
|
|
@ -20,4 +20,5 @@ const char* osd_op_names[] = {
|
|||
"primary_sync",
|
||||
"primary_delete",
|
||||
"ping",
|
||||
"sec_read_bmp",
|
||||
};
|
||||
|
|
|
@ -28,7 +28,8 @@
|
|||
#define OSD_OP_SYNC 13
|
||||
#define OSD_OP_DELETE 14
|
||||
#define OSD_OP_PING 15
|
||||
#define OSD_OP_MAX 15
|
||||
#define OSD_OP_SEC_READ_BMP 16
|
||||
#define OSD_OP_MAX 16
|
||||
// Alignment & limit for read/write operations
|
||||
#ifndef MEM_ALIGNMENT
|
||||
#define MEM_ALIGNMENT 512
|
||||
|
@ -59,7 +60,7 @@ struct __attribute__((__packed__)) osd_reply_header_t
|
|||
};
|
||||
|
||||
// read or write to the secondary OSD
|
||||
struct __attribute__((__packed__)) osd_op_secondary_rw_t
|
||||
struct __attribute__((__packed__)) osd_op_sec_rw_t
|
||||
{
|
||||
osd_op_header_t header;
|
||||
// object
|
||||
|
@ -76,7 +77,7 @@ struct __attribute__((__packed__)) osd_op_secondary_rw_t
|
|||
uint32_t pad0;
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) osd_reply_secondary_rw_t
|
||||
struct __attribute__((__packed__)) osd_reply_sec_rw_t
|
||||
{
|
||||
osd_reply_header_t header;
|
||||
// for reads and writes: assigned or read version number
|
||||
|
@ -87,7 +88,7 @@ struct __attribute__((__packed__)) osd_reply_secondary_rw_t
|
|||
};
|
||||
|
||||
// delete object on the secondary OSD
|
||||
struct __attribute__((__packed__)) osd_op_secondary_del_t
|
||||
struct __attribute__((__packed__)) osd_op_sec_del_t
|
||||
{
|
||||
osd_op_header_t header;
|
||||
// object
|
||||
|
@ -96,37 +97,51 @@ struct __attribute__((__packed__)) osd_op_secondary_del_t
|
|||
uint64_t version;
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) osd_reply_secondary_del_t
|
||||
struct __attribute__((__packed__)) osd_reply_sec_del_t
|
||||
{
|
||||
osd_reply_header_t header;
|
||||
uint64_t version;
|
||||
};
|
||||
|
||||
// sync to the secondary OSD
|
||||
struct __attribute__((__packed__)) osd_op_secondary_sync_t
|
||||
struct __attribute__((__packed__)) osd_op_sec_sync_t
|
||||
{
|
||||
osd_op_header_t header;
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) osd_reply_secondary_sync_t
|
||||
struct __attribute__((__packed__)) osd_reply_sec_sync_t
|
||||
{
|
||||
osd_reply_header_t header;
|
||||
};
|
||||
|
||||
// stabilize or rollback objects on the secondary OSD
|
||||
struct __attribute__((__packed__)) osd_op_secondary_stabilize_t
|
||||
struct __attribute__((__packed__)) osd_op_sec_stab_t
|
||||
{
|
||||
osd_op_header_t header;
|
||||
// obj_ver_id array length in bytes
|
||||
uint64_t len;
|
||||
};
|
||||
typedef osd_op_secondary_stabilize_t osd_op_secondary_rollback_t;
|
||||
typedef osd_op_sec_stab_t osd_op_sec_rollback_t;
|
||||
|
||||
struct __attribute__((__packed__)) osd_reply_secondary_stabilize_t
|
||||
struct __attribute__((__packed__)) osd_reply_sec_stab_t
|
||||
{
|
||||
osd_reply_header_t header;
|
||||
};
|
||||
typedef osd_reply_secondary_stabilize_t osd_reply_secondary_rollback_t;
|
||||
typedef osd_reply_sec_stab_t osd_reply_sec_rollback_t;
|
||||
|
||||
// bulk read bitmaps from a secondary OSD
|
||||
struct __attribute__((__packed__)) osd_op_sec_read_bmp_t
|
||||
{
|
||||
osd_op_header_t header;
|
||||
// obj_ver_id array length in bytes
|
||||
uint64_t len;
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) osd_reply_sec_read_bmp_t
|
||||
{
|
||||
// retval is payload length in bytes. payload is {version,bitmap}[]
|
||||
osd_reply_header_t header;
|
||||
};
|
||||
|
||||
// show configuration
|
||||
struct __attribute__((__packed__)) osd_op_show_config_t
|
||||
|
@ -140,7 +155,7 @@ struct __attribute__((__packed__)) osd_reply_show_config_t
|
|||
};
|
||||
|
||||
// list objects on replica
|
||||
struct __attribute__((__packed__)) osd_op_secondary_list_t
|
||||
struct __attribute__((__packed__)) osd_op_sec_list_t
|
||||
{
|
||||
osd_op_header_t header;
|
||||
// placement group total number and total count
|
||||
|
@ -151,7 +166,7 @@ struct __attribute__((__packed__)) osd_op_secondary_list_t
|
|||
uint64_t min_inode, max_inode;
|
||||
};
|
||||
|
||||
struct __attribute__((__packed__)) osd_reply_secondary_list_t
|
||||
struct __attribute__((__packed__)) osd_reply_sec_list_t
|
||||
{
|
||||
osd_reply_header_t header;
|
||||
// stable object version count. header.retval = total object version count
|
||||
|
@ -194,11 +209,12 @@ struct __attribute__((__packed__)) osd_reply_sync_t
|
|||
union osd_any_op_t
|
||||
{
|
||||
osd_op_header_t hdr;
|
||||
osd_op_secondary_rw_t sec_rw;
|
||||
osd_op_secondary_del_t sec_del;
|
||||
osd_op_secondary_sync_t sec_sync;
|
||||
osd_op_secondary_stabilize_t sec_stab;
|
||||
osd_op_secondary_list_t sec_list;
|
||||
osd_op_sec_rw_t sec_rw;
|
||||
osd_op_sec_del_t sec_del;
|
||||
osd_op_sec_sync_t sec_sync;
|
||||
osd_op_sec_stab_t sec_stab;
|
||||
osd_op_sec_read_bmp_t sec_read_bmp;
|
||||
osd_op_sec_list_t sec_list;
|
||||
osd_op_show_config_t show_conf;
|
||||
osd_op_rw_t rw;
|
||||
osd_op_sync_t sync;
|
||||
|
@ -208,11 +224,12 @@ union osd_any_op_t
|
|||
union osd_any_reply_t
|
||||
{
|
||||
osd_reply_header_t hdr;
|
||||
osd_reply_secondary_rw_t sec_rw;
|
||||
osd_reply_secondary_del_t sec_del;
|
||||
osd_reply_secondary_sync_t sec_sync;
|
||||
osd_reply_secondary_stabilize_t sec_stab;
|
||||
osd_reply_secondary_list_t sec_list;
|
||||
osd_reply_sec_rw_t sec_rw;
|
||||
osd_reply_sec_del_t sec_del;
|
||||
osd_reply_sec_sync_t sec_sync;
|
||||
osd_reply_sec_stab_t sec_stab;
|
||||
osd_reply_sec_read_bmp_t sec_read_bmp;
|
||||
osd_reply_sec_list_t sec_list;
|
||||
osd_reply_show_config_t show_conf;
|
||||
osd_reply_rw_t rw;
|
||||
osd_reply_sync_t sync;
|
||||
|
|
|
@ -35,7 +35,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
|||
// oid.stripe = starting offset of the parity stripe
|
||||
.stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size,
|
||||
};
|
||||
pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pool_cfg.pg_stripe_size) % pg_counts[pool_id] + 1;
|
||||
pg_num_t pg_num = (oid.stripe/pool_cfg.pg_stripe_size) % pg_counts[pool_id] + 1; // like map_to_pg()
|
||||
auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
|
||||
if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
|
||||
{
|
||||
|
@ -188,6 +188,187 @@ resume_2:
|
|||
finish_op(cur_op, cur_op->req.rw.len);
|
||||
}
|
||||
|
||||
struct bitmap_request_t
|
||||
{
|
||||
osd_num_t osd_num;
|
||||
object_id oid;
|
||||
uint64_t version;
|
||||
void *bmp_buf;
|
||||
};
|
||||
|
||||
inline bool operator < (const bitmap_request_t & a, const bitmap_request_t & b)
|
||||
{
|
||||
return a.osd_num < b.osd_num || a.osd_num == b.osd_num && a.oid < b.oid;
|
||||
}
|
||||
|
||||
bool osd_t::read_bitmaps()
|
||||
{
|
||||
if (chain.size() > 1)
|
||||
{
|
||||
// First read all bitmaps
|
||||
uint64_t stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size);
|
||||
cur_op->snapshot_bitmaps = malloc_or_die(
|
||||
// bitmaps
|
||||
chain.size() * stripe_count * clean_entry_bitmap_size +
|
||||
// chain itself
|
||||
sizeof(inode_t) * chain.size() +
|
||||
// 'missing' flags for each stripe
|
||||
(pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : chain.size() * pg_size)
|
||||
);
|
||||
inode_t *chain_copy = (inode_t*)((void*)cur_op->snapshot_bitmaps
|
||||
+ chain.size() * stripe_count * clean_entry_bitmap_size);
|
||||
uint8_t *missing_flags = (uint8_t*)((void*)chain_copy
|
||||
+ sizeof(inode_t) * chain.size());
|
||||
memcpy(chain_copy, chain.data(), sizeof(inode_t) * chain.size()); // условно
|
||||
if (pg.state == PG_ACTIVE)
|
||||
{
|
||||
std::vector<bitmap_request_t> bitmap_requests;
|
||||
for (inode_t inode: chain)
|
||||
{
|
||||
object_id cur_oid = { .inode = inode, .stripe = op_data->oid.stripe };
|
||||
auto vo_it = pg.ver_override.find(cur_oid);
|
||||
bitmap_requests.push_back(vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX);
|
||||
}
|
||||
submit_primary_subops(
|
||||
SUBMIT_READ_BITMAPS, target_ver,
|
||||
(op_data->scheme == POOL_SCHEME_REPLICATED ? pg.pg_size : op_data->pg_data_size),
|
||||
pg.cur_set.data(), cur_op
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::vector<bitmap_request_t> bitmap_requests;
|
||||
for (inode_t inode: chain)
|
||||
{
|
||||
object_id cur_oid = { .inode = inode, .stripe = op_data->oid.stripe };
|
||||
auto vo_it = pg.ver_override.find(cur_oid);
|
||||
uint64_t target_version = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
|
||||
uint64_t* cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), NULL);
|
||||
if (op_data->scheme == POOL_SCHEME_REPLICATED)
|
||||
{
|
||||
osd_num_t read_target = 0;
|
||||
for (int i = 0; i < pg.pg_size; i++)
|
||||
{
|
||||
if (cur_set[i] == this->osd_num || cur_set[i] != 0 && read_target == 0)
|
||||
{
|
||||
// Select local or any other available OSD for reading
|
||||
read_target = cur_set[i];
|
||||
}
|
||||
}
|
||||
assert(read_target != 0);
|
||||
bitmap_requests.push_back((bitmap_request_t*){
|
||||
.osd_num = read_target,
|
||||
.oid = cur_oid,
|
||||
.version = target_version,
|
||||
.bmp_buf = cur_op->snapshot_bitmaps + chain_num*clean_entry_bitmap_size,
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
osd_rmw_stripe_t local_stripes[pg.pg_size];
|
||||
memcpy(local_stripes, op_data->stripes, sizeof(osd_rmw_stripe_t) * stripe_count);
|
||||
if (extend_missing_stripes(local_stripes, cur_set, op_data->pg_data_size, pg.pg_size) < 0)
|
||||
{
|
||||
free(snapshot_bitmaps);
|
||||
finish_op(cur_op, -EIO);
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < pg.pg_size; i++)
|
||||
{
|
||||
if (cur_set[i] != 0 && local_stripes[i].read_end != 0)
|
||||
{
|
||||
bitmap_requests.push_back((bitmap_request_t*){
|
||||
.osd_num = cur_set[i],
|
||||
.oid = cur_oid,
|
||||
.version = target_version,
|
||||
.bmp_buf = cur_op->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size,
|
||||
});
|
||||
missing_flags[chain_num*pg_size + i] = 0;
|
||||
}
|
||||
else
|
||||
missing_flags[chain_num*pg_size + i] = 1;
|
||||
}
|
||||
}
|
||||
chain_num++;
|
||||
}
|
||||
std::sort(bitmap_requests.begin(), bitmap_requests.end());
|
||||
op_data->n_subops = 0;
|
||||
for (int i = 0; i < bitmap_requests.size(); i++)
|
||||
{
|
||||
if ((i == bitmap_requests.size()-1 || bitmap_requests[i+1].osd_num != bitmap_requests[i].osd_num) &&
|
||||
bitmap_requests[i].osd_num != this->osd_num)
|
||||
{
|
||||
op_data->n_subops++;
|
||||
}
|
||||
}
|
||||
if (op_data->n_subops)
|
||||
{
|
||||
op_data->fact_ver = 0;
|
||||
op_data->done = op_data->errors = 0;
|
||||
op_data->subops = new osd_op_t[request_count];
|
||||
}
|
||||
for (int i = 0, subop_idx = 0; i < bitmap_requests.size(); i++)
|
||||
{
|
||||
if (i == bitmap_requests.size()-1 || bitmap_requests[i+1].osd_num != bitmap_requests[i].osd_num)
|
||||
{
|
||||
osd_num_t subop_osd_num = bitmap_requests[i].osd_num;
|
||||
if (subop_osd_num == this->osd_num)
|
||||
{
|
||||
// Read synchronously
|
||||
for (int j = prev; j <= i; j++)
|
||||
{
|
||||
bs->read_bitmap(bitmap_requests[i].oid, bitmap_requests[i].version, bitmap_requests[i].bmp_buf, NULL);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Send to a remote OSD
|
||||
osd_op_t *subop = subops+subop_idx;
|
||||
subop->op_type = OSD_OP_OUT;
|
||||
subop->peer_fd = c_cli.osd_peer_fds.at(subop_osd_num);
|
||||
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
|
||||
subop->req.sec_read_bmp = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = c_cli.next_subop_id++,
|
||||
.opcode = OSD_OP_SEC_READ_BMP,
|
||||
},
|
||||
.len = sizeof(obj_ver_id)*(i+1-prev),
|
||||
};
|
||||
obj_ver_id *ov = (obj_ver_id*)subop->buf;
|
||||
for (int j = prev; j <= i; j++, ov++)
|
||||
{
|
||||
ov->oid = bitmap_requests[i].oid;
|
||||
ov->version = bitmap_requests[i].version;
|
||||
}
|
||||
subop->callback = [cur_op, this](osd_op_t *subop)
|
||||
{
|
||||
int requested_count = subop->req.sec_read_bmp.len/sizeof(obj_ver_id);
|
||||
int fail_fd = subop->reply.hdr.retval != requested_count * (8 + clean_entry_bitmap_size) ? subop->peer_fd : -1;
|
||||
handle_primary_bitmap_subop(subop, cur_op);
|
||||
if (fail_fd >= 0)
|
||||
{
|
||||
// read_bitmaps operation failed, drop the connection
|
||||
c_cli.stop_client(fail_fd);
|
||||
}
|
||||
};
|
||||
c_cli.outbox_push(subop);
|
||||
subop_idx++;
|
||||
}
|
||||
prev = i+1;
|
||||
}
|
||||
}
|
||||
if (op_data->n_subops)
|
||||
{
|
||||
// Wait for subops
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
|
||||
{
|
||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||
|
|
|
@ -165,8 +165,10 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
|
|||
.stripe = op_data->oid.stripe | stripe_num,
|
||||
},
|
||||
.version = op_version,
|
||||
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
|
||||
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
|
||||
.offset = submit_type == SUBMIT_READ_BITMAPS ? 0
|
||||
: (wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start),
|
||||
.len = submit_type == SUBMIT_READ_BITMAPS ? 0
|
||||
: (wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start),
|
||||
.buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
|
||||
.bitmap = stripes[stripe_num].bmp_buf,
|
||||
});
|
||||
|
@ -196,8 +198,10 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
|
|||
.stripe = op_data->oid.stripe | stripe_num,
|
||||
},
|
||||
.version = op_version,
|
||||
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
|
||||
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
|
||||
.offset = submit_type == SUBMIT_READ_BITMAPS ? 0
|
||||
: (wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start),
|
||||
.len = submit_type == SUBMIT_READ_BITMAPS ? 0
|
||||
: (wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start),
|
||||
.attr_len = wr ? clean_entry_bitmap_size : 0,
|
||||
};
|
||||
#ifdef OSD_DEBUG
|
||||
|
@ -214,7 +218,7 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
|
|||
subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
|
||||
}
|
||||
}
|
||||
else
|
||||
else if (submit_type != SUBMIT_READ_BITMAPS)
|
||||
{
|
||||
if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
|
||||
{
|
||||
|
|
|
@ -44,6 +44,25 @@ void osd_t::secondary_op_callback(osd_op_t *op)
|
|||
|
||||
void osd_t::exec_secondary(osd_op_t *cur_op)
|
||||
{
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
||||
{
|
||||
int n = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id);
|
||||
if (n > 0)
|
||||
{
|
||||
obj_ver_id *ov = (obj_ver_id*)cur_op->buf;
|
||||
void *reply_buf = malloc_or_die(n * (8 + clean_entry_bitmap_size));
|
||||
void *cur_buf = reply_buf;
|
||||
for (int i = 0; i < n; i++)
|
||||
{
|
||||
bs->read_bitmap(ov[i].oid, ov[i].version, cur_buf + sizeof(uint64_t), (uint64_t*)cur_buf);
|
||||
cur_buf += (8 + clean_entry_bitmap_size);
|
||||
}
|
||||
free(cur_op->buf);
|
||||
cur_op->iov.push_back(cur_op->buf, cur_op->bs_op->retval * sizeof(obj_ver_id));
|
||||
}
|
||||
finish_op(cur_op, n * (8 + clean_entry_bitmap_size));
|
||||
return;
|
||||
}
|
||||
cur_op->bs_op = new blockstore_op_t();
|
||||
cur_op->bs_op->callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
|
||||
cur_op->bs_op->opcode = (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ? BS_OP_READ
|
||||
|
|
Loading…
Reference in New Issue