forked from vitalif/vitastor
Compare commits
No commits in common. "opt-read" and "master" have entirely different histories.
|
@ -38,11 +38,6 @@ void blockstore_t::enqueue_op(blockstore_op_t *op)
|
||||||
impl->enqueue_op(op);
|
impl->enqueue_op(op);
|
||||||
}
|
}
|
||||||
|
|
||||||
int blockstore_t::read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version)
|
|
||||||
{
|
|
||||||
return impl->read_bitmap(oid, target_version, bitmap, result_version);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unordered_map<object_id, uint64_t> & blockstore_t::get_unstable_writes()
|
std::unordered_map<object_id, uint64_t> & blockstore_t::get_unstable_writes()
|
||||||
{
|
{
|
||||||
return impl->unstable_writes;
|
return impl->unstable_writes;
|
||||||
|
|
|
@ -179,9 +179,6 @@ public:
|
||||||
// Submission
|
// Submission
|
||||||
void enqueue_op(blockstore_op_t *op);
|
void enqueue_op(blockstore_op_t *op);
|
||||||
|
|
||||||
// Simplified synchronous operation: get object bitmap & current version
|
|
||||||
int read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version = NULL);
|
|
||||||
|
|
||||||
// Unstable writes are added here (map of object_id -> version)
|
// Unstable writes are added here (map of object_id -> version)
|
||||||
std::unordered_map<object_id, uint64_t> & get_unstable_writes();
|
std::unordered_map<object_id, uint64_t> & get_unstable_writes();
|
||||||
|
|
||||||
|
|
|
@ -443,7 +443,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||||
}
|
}
|
||||||
for (; clean_it != clean_end; clean_it++)
|
for (; clean_it != clean_end; clean_it++)
|
||||||
{
|
{
|
||||||
if (!pg_count || ((clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
|
if (!pg_count || ((clean_it->first.inode + clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||||
{
|
{
|
||||||
if (stable_count >= stable_alloc)
|
if (stable_count >= stable_alloc)
|
||||||
{
|
{
|
||||||
|
@ -488,7 +488,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||||
}
|
}
|
||||||
for (; dirty_it != dirty_end; dirty_it++)
|
for (; dirty_it != dirty_end; dirty_it++)
|
||||||
{
|
{
|
||||||
if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
|
if (!pg_count || ((dirty_it->first.oid.inode + dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg)
|
||||||
{
|
{
|
||||||
if (IS_DELETE(dirty_it->second.state))
|
if (IS_DELETE(dirty_it->second.state))
|
||||||
{
|
{
|
||||||
|
|
|
@ -323,9 +323,6 @@ public:
|
||||||
// Submission
|
// Submission
|
||||||
void enqueue_op(blockstore_op_t *op);
|
void enqueue_op(blockstore_op_t *op);
|
||||||
|
|
||||||
// Simplified synchronous operation: get object bitmap & current version
|
|
||||||
int read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version = NULL);
|
|
||||||
|
|
||||||
// Unstable writes are added here (map of object_id -> version)
|
// Unstable writes are added here (map of object_id -> version)
|
||||||
std::unordered_map<object_id, uint64_t> unstable_writes;
|
std::unordered_map<object_id, uint64_t> unstable_writes;
|
||||||
|
|
||||||
|
|
|
@ -268,50 +268,3 @@ void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op
|
||||||
FINISH_OP(op);
|
FINISH_OP(op);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int blockstore_impl_t::read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version)
|
|
||||||
{
|
|
||||||
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
|
|
||||||
.oid = oid,
|
|
||||||
.version = UINT64_MAX,
|
|
||||||
});
|
|
||||||
if (dirty_it != dirty_db.begin())
|
|
||||||
dirty_it--;
|
|
||||||
if (dirty_it != dirty_db.end())
|
|
||||||
{
|
|
||||||
while (dirty_it->first.oid == oid)
|
|
||||||
{
|
|
||||||
if (target_version >= dirty_it->first.version)
|
|
||||||
{
|
|
||||||
if (result_version)
|
|
||||||
*result_version = dirty_it->first.version;
|
|
||||||
if (bitmap)
|
|
||||||
{
|
|
||||||
void *bmp_ptr = (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap);
|
|
||||||
memcpy(bitmap, bmp_ptr, clean_entry_bitmap_size);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if (dirty_it == dirty_db.begin())
|
|
||||||
break;
|
|
||||||
dirty_it--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
auto clean_it = clean_db.find(oid);
|
|
||||||
if (clean_it != clean_db.end())
|
|
||||||
{
|
|
||||||
if (result_version)
|
|
||||||
*result_version = clean_it->second.version;
|
|
||||||
if (bitmap)
|
|
||||||
{
|
|
||||||
void *bmp_ptr = get_clean_entry_bitmap(clean_it->second.location, clean_entry_bitmap_size);
|
|
||||||
memcpy(bitmap, bmp_ptr, clean_entry_bitmap_size);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if (result_version)
|
|
||||||
*result_version = 0;
|
|
||||||
if (bitmap)
|
|
||||||
memset(bitmap, 0, clean_entry_bitmap_size);
|
|
||||||
return -ENOENT;
|
|
||||||
}
|
|
||||||
|
|
|
@ -582,7 +582,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
|
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
|
||||||
{
|
{
|
||||||
pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
|
pg_num_t pg_num = (op->cur_inode + stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
|
||||||
uint64_t begin = (op->offset < stripe ? stripe : op->offset);
|
uint64_t begin = (op->offset < stripe ? stripe : op->offset);
|
||||||
uint64_t end = (op->offset + op->len) > (stripe + pg_block_size)
|
uint64_t end = (op->offset + op->len) > (stripe + pg_block_size)
|
||||||
? (stripe + pg_block_size) : (op->offset + op->len);
|
? (stripe + pg_block_size) : (op->offset + op->len);
|
||||||
|
|
|
@ -232,15 +232,6 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
||||||
}
|
}
|
||||||
cl->read_remaining = cur_op->req.sec_stab.len;
|
cl->read_remaining = cur_op->req.sec_stab.len;
|
||||||
}
|
}
|
||||||
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
|
||||||
{
|
|
||||||
if (cur_op->req.sec_read_bmp.len > 0)
|
|
||||||
{
|
|
||||||
cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_read_bmp.len);
|
|
||||||
cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_read_bmp.len);
|
|
||||||
}
|
|
||||||
cl->read_remaining = cur_op->req.sec_read_bmp.len;
|
|
||||||
}
|
|
||||||
else if (cur_op->req.hdr.opcode == OSD_OP_READ)
|
else if (cur_op->req.hdr.opcode == OSD_OP_READ)
|
||||||
{
|
{
|
||||||
cl->read_remaining = 0;
|
cl->read_remaining = 0;
|
||||||
|
|
|
@ -73,7 +73,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
||||||
? (cur_op->req.hdr.opcode == OSD_OP_READ ||
|
? (cur_op->req.hdr.opcode == OSD_OP_READ ||
|
||||||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||||
cur_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
|
cur_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
|
||||||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP ||
|
|
||||||
cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
|
cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
|
||||||
: (cur_op->req.hdr.opcode == OSD_OP_WRITE ||
|
: (cur_op->req.hdr.opcode == OSD_OP_WRITE ||
|
||||||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||||
|
|
|
@ -228,7 +228,6 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||||
cur_op->req.hdr.opcode != OSD_OP_SEC_READ &&
|
cur_op->req.hdr.opcode != OSD_OP_SEC_READ &&
|
||||||
cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
|
cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
|
||||||
cur_op->req.hdr.opcode != OSD_OP_READ &&
|
cur_op->req.hdr.opcode != OSD_OP_READ &&
|
||||||
cur_op->req.hdr.opcode != OSD_OP_SEC_READ_BMP &&
|
|
||||||
cur_op->req.hdr.opcode != OSD_OP_SHOW_CONFIG)
|
cur_op->req.hdr.opcode != OSD_OP_SHOW_CONFIG)
|
||||||
{
|
{
|
||||||
// Readonly mode
|
// Readonly mode
|
||||||
|
|
|
@ -226,7 +226,7 @@ class osd_t
|
||||||
uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)];
|
uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)];
|
||||||
if (!pg_count)
|
if (!pg_count)
|
||||||
pg_count = 1;
|
pg_count = 1;
|
||||||
return (oid.stripe / pg_stripe_size) % pg_count + 1;
|
return (oid.inode + oid.stripe / pg_stripe_size) % pg_count + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -20,5 +20,4 @@ const char* osd_op_names[] = {
|
||||||
"primary_sync",
|
"primary_sync",
|
||||||
"primary_delete",
|
"primary_delete",
|
||||||
"ping",
|
"ping",
|
||||||
"sec_read_bmp",
|
|
||||||
};
|
};
|
||||||
|
|
|
@ -28,8 +28,7 @@
|
||||||
#define OSD_OP_SYNC 13
|
#define OSD_OP_SYNC 13
|
||||||
#define OSD_OP_DELETE 14
|
#define OSD_OP_DELETE 14
|
||||||
#define OSD_OP_PING 15
|
#define OSD_OP_PING 15
|
||||||
#define OSD_OP_SEC_READ_BMP 16
|
#define OSD_OP_MAX 15
|
||||||
#define OSD_OP_MAX 16
|
|
||||||
// Alignment & limit for read/write operations
|
// Alignment & limit for read/write operations
|
||||||
#ifndef MEM_ALIGNMENT
|
#ifndef MEM_ALIGNMENT
|
||||||
#define MEM_ALIGNMENT 512
|
#define MEM_ALIGNMENT 512
|
||||||
|
@ -60,7 +59,7 @@ struct __attribute__((__packed__)) osd_reply_header_t
|
||||||
};
|
};
|
||||||
|
|
||||||
// read or write to the secondary OSD
|
// read or write to the secondary OSD
|
||||||
struct __attribute__((__packed__)) osd_op_sec_rw_t
|
struct __attribute__((__packed__)) osd_op_secondary_rw_t
|
||||||
{
|
{
|
||||||
osd_op_header_t header;
|
osd_op_header_t header;
|
||||||
// object
|
// object
|
||||||
|
@ -77,7 +76,7 @@ struct __attribute__((__packed__)) osd_op_sec_rw_t
|
||||||
uint32_t pad0;
|
uint32_t pad0;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct __attribute__((__packed__)) osd_reply_sec_rw_t
|
struct __attribute__((__packed__)) osd_reply_secondary_rw_t
|
||||||
{
|
{
|
||||||
osd_reply_header_t header;
|
osd_reply_header_t header;
|
||||||
// for reads and writes: assigned or read version number
|
// for reads and writes: assigned or read version number
|
||||||
|
@ -88,7 +87,7 @@ struct __attribute__((__packed__)) osd_reply_sec_rw_t
|
||||||
};
|
};
|
||||||
|
|
||||||
// delete object on the secondary OSD
|
// delete object on the secondary OSD
|
||||||
struct __attribute__((__packed__)) osd_op_sec_del_t
|
struct __attribute__((__packed__)) osd_op_secondary_del_t
|
||||||
{
|
{
|
||||||
osd_op_header_t header;
|
osd_op_header_t header;
|
||||||
// object
|
// object
|
||||||
|
@ -97,51 +96,37 @@ struct __attribute__((__packed__)) osd_op_sec_del_t
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct __attribute__((__packed__)) osd_reply_sec_del_t
|
struct __attribute__((__packed__)) osd_reply_secondary_del_t
|
||||||
{
|
{
|
||||||
osd_reply_header_t header;
|
osd_reply_header_t header;
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
};
|
};
|
||||||
|
|
||||||
// sync to the secondary OSD
|
// sync to the secondary OSD
|
||||||
struct __attribute__((__packed__)) osd_op_sec_sync_t
|
struct __attribute__((__packed__)) osd_op_secondary_sync_t
|
||||||
{
|
{
|
||||||
osd_op_header_t header;
|
osd_op_header_t header;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct __attribute__((__packed__)) osd_reply_sec_sync_t
|
struct __attribute__((__packed__)) osd_reply_secondary_sync_t
|
||||||
{
|
{
|
||||||
osd_reply_header_t header;
|
osd_reply_header_t header;
|
||||||
};
|
};
|
||||||
|
|
||||||
// stabilize or rollback objects on the secondary OSD
|
// stabilize or rollback objects on the secondary OSD
|
||||||
struct __attribute__((__packed__)) osd_op_sec_stab_t
|
struct __attribute__((__packed__)) osd_op_secondary_stabilize_t
|
||||||
{
|
{
|
||||||
osd_op_header_t header;
|
osd_op_header_t header;
|
||||||
// obj_ver_id array length in bytes
|
// obj_ver_id array length in bytes
|
||||||
uint64_t len;
|
uint64_t len;
|
||||||
};
|
};
|
||||||
typedef osd_op_sec_stab_t osd_op_sec_rollback_t;
|
typedef osd_op_secondary_stabilize_t osd_op_secondary_rollback_t;
|
||||||
|
|
||||||
struct __attribute__((__packed__)) osd_reply_sec_stab_t
|
struct __attribute__((__packed__)) osd_reply_secondary_stabilize_t
|
||||||
{
|
{
|
||||||
osd_reply_header_t header;
|
osd_reply_header_t header;
|
||||||
};
|
};
|
||||||
typedef osd_reply_sec_stab_t osd_reply_sec_rollback_t;
|
typedef osd_reply_secondary_stabilize_t osd_reply_secondary_rollback_t;
|
||||||
|
|
||||||
// bulk read bitmaps from a secondary OSD
|
|
||||||
struct __attribute__((__packed__)) osd_op_sec_read_bmp_t
|
|
||||||
{
|
|
||||||
osd_op_header_t header;
|
|
||||||
// obj_ver_id array length in bytes
|
|
||||||
uint64_t len;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct __attribute__((__packed__)) osd_reply_sec_read_bmp_t
|
|
||||||
{
|
|
||||||
// retval is payload length in bytes. payload is {version,bitmap}[]
|
|
||||||
osd_reply_header_t header;
|
|
||||||
};
|
|
||||||
|
|
||||||
// show configuration
|
// show configuration
|
||||||
struct __attribute__((__packed__)) osd_op_show_config_t
|
struct __attribute__((__packed__)) osd_op_show_config_t
|
||||||
|
@ -155,7 +140,7 @@ struct __attribute__((__packed__)) osd_reply_show_config_t
|
||||||
};
|
};
|
||||||
|
|
||||||
// list objects on replica
|
// list objects on replica
|
||||||
struct __attribute__((__packed__)) osd_op_sec_list_t
|
struct __attribute__((__packed__)) osd_op_secondary_list_t
|
||||||
{
|
{
|
||||||
osd_op_header_t header;
|
osd_op_header_t header;
|
||||||
// placement group total number and total count
|
// placement group total number and total count
|
||||||
|
@ -166,7 +151,7 @@ struct __attribute__((__packed__)) osd_op_sec_list_t
|
||||||
uint64_t min_inode, max_inode;
|
uint64_t min_inode, max_inode;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct __attribute__((__packed__)) osd_reply_sec_list_t
|
struct __attribute__((__packed__)) osd_reply_secondary_list_t
|
||||||
{
|
{
|
||||||
osd_reply_header_t header;
|
osd_reply_header_t header;
|
||||||
// stable object version count. header.retval = total object version count
|
// stable object version count. header.retval = total object version count
|
||||||
|
@ -209,12 +194,11 @@ struct __attribute__((__packed__)) osd_reply_sync_t
|
||||||
union osd_any_op_t
|
union osd_any_op_t
|
||||||
{
|
{
|
||||||
osd_op_header_t hdr;
|
osd_op_header_t hdr;
|
||||||
osd_op_sec_rw_t sec_rw;
|
osd_op_secondary_rw_t sec_rw;
|
||||||
osd_op_sec_del_t sec_del;
|
osd_op_secondary_del_t sec_del;
|
||||||
osd_op_sec_sync_t sec_sync;
|
osd_op_secondary_sync_t sec_sync;
|
||||||
osd_op_sec_stab_t sec_stab;
|
osd_op_secondary_stabilize_t sec_stab;
|
||||||
osd_op_sec_read_bmp_t sec_read_bmp;
|
osd_op_secondary_list_t sec_list;
|
||||||
osd_op_sec_list_t sec_list;
|
|
||||||
osd_op_show_config_t show_conf;
|
osd_op_show_config_t show_conf;
|
||||||
osd_op_rw_t rw;
|
osd_op_rw_t rw;
|
||||||
osd_op_sync_t sync;
|
osd_op_sync_t sync;
|
||||||
|
@ -224,12 +208,11 @@ union osd_any_op_t
|
||||||
union osd_any_reply_t
|
union osd_any_reply_t
|
||||||
{
|
{
|
||||||
osd_reply_header_t hdr;
|
osd_reply_header_t hdr;
|
||||||
osd_reply_sec_rw_t sec_rw;
|
osd_reply_secondary_rw_t sec_rw;
|
||||||
osd_reply_sec_del_t sec_del;
|
osd_reply_secondary_del_t sec_del;
|
||||||
osd_reply_sec_sync_t sec_sync;
|
osd_reply_secondary_sync_t sec_sync;
|
||||||
osd_reply_sec_stab_t sec_stab;
|
osd_reply_secondary_stabilize_t sec_stab;
|
||||||
osd_reply_sec_read_bmp_t sec_read_bmp;
|
osd_reply_secondary_list_t sec_list;
|
||||||
osd_reply_sec_list_t sec_list;
|
|
||||||
osd_reply_show_config_t show_conf;
|
osd_reply_show_config_t show_conf;
|
||||||
osd_reply_rw_t rw;
|
osd_reply_rw_t rw;
|
||||||
osd_reply_sync_t sync;
|
osd_reply_sync_t sync;
|
||||||
|
|
|
@ -35,7 +35,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
|
||||||
// oid.stripe = starting offset of the parity stripe
|
// oid.stripe = starting offset of the parity stripe
|
||||||
.stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size,
|
.stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size,
|
||||||
};
|
};
|
||||||
pg_num_t pg_num = (oid.stripe/pool_cfg.pg_stripe_size) % pg_counts[pool_id] + 1; // like map_to_pg()
|
pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pool_cfg.pg_stripe_size) % pg_counts[pool_id] + 1;
|
||||||
auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
|
auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
|
||||||
if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
|
if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
|
||||||
{
|
{
|
||||||
|
@ -188,187 +188,6 @@ resume_2:
|
||||||
finish_op(cur_op, cur_op->req.rw.len);
|
finish_op(cur_op, cur_op->req.rw.len);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct bitmap_request_t
|
|
||||||
{
|
|
||||||
osd_num_t osd_num;
|
|
||||||
object_id oid;
|
|
||||||
uint64_t version;
|
|
||||||
void *bmp_buf;
|
|
||||||
};
|
|
||||||
|
|
||||||
inline bool operator < (const bitmap_request_t & a, const bitmap_request_t & b)
|
|
||||||
{
|
|
||||||
return a.osd_num < b.osd_num || a.osd_num == b.osd_num && a.oid < b.oid;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool osd_t::read_bitmaps()
|
|
||||||
{
|
|
||||||
if (chain.size() > 1)
|
|
||||||
{
|
|
||||||
// First read all bitmaps
|
|
||||||
uint64_t stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size);
|
|
||||||
cur_op->snapshot_bitmaps = malloc_or_die(
|
|
||||||
// bitmaps
|
|
||||||
chain.size() * stripe_count * clean_entry_bitmap_size +
|
|
||||||
// chain itself
|
|
||||||
sizeof(inode_t) * chain.size() +
|
|
||||||
// 'missing' flags for each stripe
|
|
||||||
(pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : chain.size() * pg_size)
|
|
||||||
);
|
|
||||||
inode_t *chain_copy = (inode_t*)((void*)cur_op->snapshot_bitmaps
|
|
||||||
+ chain.size() * stripe_count * clean_entry_bitmap_size);
|
|
||||||
uint8_t *missing_flags = (uint8_t*)((void*)chain_copy
|
|
||||||
+ sizeof(inode_t) * chain.size());
|
|
||||||
memcpy(chain_copy, chain.data(), sizeof(inode_t) * chain.size()); // условно
|
|
||||||
if (pg.state == PG_ACTIVE)
|
|
||||||
{
|
|
||||||
std::vector<bitmap_request_t> bitmap_requests;
|
|
||||||
for (inode_t inode: chain)
|
|
||||||
{
|
|
||||||
object_id cur_oid = { .inode = inode, .stripe = op_data->oid.stripe };
|
|
||||||
auto vo_it = pg.ver_override.find(cur_oid);
|
|
||||||
bitmap_requests.push_back(vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX);
|
|
||||||
}
|
|
||||||
submit_primary_subops(
|
|
||||||
SUBMIT_READ_BITMAPS, target_ver,
|
|
||||||
(op_data->scheme == POOL_SCHEME_REPLICATED ? pg.pg_size : op_data->pg_data_size),
|
|
||||||
pg.cur_set.data(), cur_op
|
|
||||||
);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
std::vector<bitmap_request_t> bitmap_requests;
|
|
||||||
for (inode_t inode: chain)
|
|
||||||
{
|
|
||||||
object_id cur_oid = { .inode = inode, .stripe = op_data->oid.stripe };
|
|
||||||
auto vo_it = pg.ver_override.find(cur_oid);
|
|
||||||
uint64_t target_version = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
|
|
||||||
uint64_t* cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), NULL);
|
|
||||||
if (op_data->scheme == POOL_SCHEME_REPLICATED)
|
|
||||||
{
|
|
||||||
osd_num_t read_target = 0;
|
|
||||||
for (int i = 0; i < pg.pg_size; i++)
|
|
||||||
{
|
|
||||||
if (cur_set[i] == this->osd_num || cur_set[i] != 0 && read_target == 0)
|
|
||||||
{
|
|
||||||
// Select local or any other available OSD for reading
|
|
||||||
read_target = cur_set[i];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert(read_target != 0);
|
|
||||||
bitmap_requests.push_back((bitmap_request_t*){
|
|
||||||
.osd_num = read_target,
|
|
||||||
.oid = cur_oid,
|
|
||||||
.version = target_version,
|
|
||||||
.bmp_buf = cur_op->snapshot_bitmaps + chain_num*clean_entry_bitmap_size,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
osd_rmw_stripe_t local_stripes[pg.pg_size];
|
|
||||||
memcpy(local_stripes, op_data->stripes, sizeof(osd_rmw_stripe_t) * stripe_count);
|
|
||||||
if (extend_missing_stripes(local_stripes, cur_set, op_data->pg_data_size, pg.pg_size) < 0)
|
|
||||||
{
|
|
||||||
free(snapshot_bitmaps);
|
|
||||||
finish_op(cur_op, -EIO);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
for (int i = 0; i < pg.pg_size; i++)
|
|
||||||
{
|
|
||||||
if (cur_set[i] != 0 && local_stripes[i].read_end != 0)
|
|
||||||
{
|
|
||||||
bitmap_requests.push_back((bitmap_request_t*){
|
|
||||||
.osd_num = cur_set[i],
|
|
||||||
.oid = cur_oid,
|
|
||||||
.version = target_version,
|
|
||||||
.bmp_buf = cur_op->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size,
|
|
||||||
});
|
|
||||||
missing_flags[chain_num*pg_size + i] = 0;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
missing_flags[chain_num*pg_size + i] = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
chain_num++;
|
|
||||||
}
|
|
||||||
std::sort(bitmap_requests.begin(), bitmap_requests.end());
|
|
||||||
op_data->n_subops = 0;
|
|
||||||
for (int i = 0; i < bitmap_requests.size(); i++)
|
|
||||||
{
|
|
||||||
if ((i == bitmap_requests.size()-1 || bitmap_requests[i+1].osd_num != bitmap_requests[i].osd_num) &&
|
|
||||||
bitmap_requests[i].osd_num != this->osd_num)
|
|
||||||
{
|
|
||||||
op_data->n_subops++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (op_data->n_subops)
|
|
||||||
{
|
|
||||||
op_data->fact_ver = 0;
|
|
||||||
op_data->done = op_data->errors = 0;
|
|
||||||
op_data->subops = new osd_op_t[request_count];
|
|
||||||
}
|
|
||||||
for (int i = 0, subop_idx = 0; i < bitmap_requests.size(); i++)
|
|
||||||
{
|
|
||||||
if (i == bitmap_requests.size()-1 || bitmap_requests[i+1].osd_num != bitmap_requests[i].osd_num)
|
|
||||||
{
|
|
||||||
osd_num_t subop_osd_num = bitmap_requests[i].osd_num;
|
|
||||||
if (subop_osd_num == this->osd_num)
|
|
||||||
{
|
|
||||||
// Read synchronously
|
|
||||||
for (int j = prev; j <= i; j++)
|
|
||||||
{
|
|
||||||
bs->read_bitmap(bitmap_requests[i].oid, bitmap_requests[i].version, bitmap_requests[i].bmp_buf, NULL);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Send to a remote OSD
|
|
||||||
osd_op_t *subop = subops+subop_idx;
|
|
||||||
subop->op_type = OSD_OP_OUT;
|
|
||||||
subop->peer_fd = c_cli.osd_peer_fds.at(subop_osd_num);
|
|
||||||
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
|
|
||||||
subop->req.sec_read_bmp = {
|
|
||||||
.header = {
|
|
||||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
|
||||||
.id = c_cli.next_subop_id++,
|
|
||||||
.opcode = OSD_OP_SEC_READ_BMP,
|
|
||||||
},
|
|
||||||
.len = sizeof(obj_ver_id)*(i+1-prev),
|
|
||||||
};
|
|
||||||
obj_ver_id *ov = (obj_ver_id*)subop->buf;
|
|
||||||
for (int j = prev; j <= i; j++, ov++)
|
|
||||||
{
|
|
||||||
ov->oid = bitmap_requests[i].oid;
|
|
||||||
ov->version = bitmap_requests[i].version;
|
|
||||||
}
|
|
||||||
subop->callback = [cur_op, this](osd_op_t *subop)
|
|
||||||
{
|
|
||||||
int requested_count = subop->req.sec_read_bmp.len/sizeof(obj_ver_id);
|
|
||||||
int fail_fd = subop->reply.hdr.retval != requested_count * (8 + clean_entry_bitmap_size) ? subop->peer_fd : -1;
|
|
||||||
handle_primary_bitmap_subop(subop, cur_op);
|
|
||||||
if (fail_fd >= 0)
|
|
||||||
{
|
|
||||||
// read_bitmaps operation failed, drop the connection
|
|
||||||
c_cli.stop_client(fail_fd);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
c_cli.outbox_push(subop);
|
|
||||||
subop_idx++;
|
|
||||||
}
|
|
||||||
prev = i+1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (op_data->n_subops)
|
|
||||||
{
|
|
||||||
// Wait for subops
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
|
bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
|
||||||
{
|
{
|
||||||
osd_primary_op_data_t *op_data = cur_op->op_data;
|
osd_primary_op_data_t *op_data = cur_op->op_data;
|
||||||
|
|
|
@ -165,10 +165,8 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
|
||||||
.stripe = op_data->oid.stripe | stripe_num,
|
.stripe = op_data->oid.stripe | stripe_num,
|
||||||
},
|
},
|
||||||
.version = op_version,
|
.version = op_version,
|
||||||
.offset = submit_type == SUBMIT_READ_BITMAPS ? 0
|
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
|
||||||
: (wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start),
|
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
|
||||||
.len = submit_type == SUBMIT_READ_BITMAPS ? 0
|
|
||||||
: (wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start),
|
|
||||||
.buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
|
.buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
|
||||||
.bitmap = stripes[stripe_num].bmp_buf,
|
.bitmap = stripes[stripe_num].bmp_buf,
|
||||||
});
|
});
|
||||||
|
@ -198,10 +196,8 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
|
||||||
.stripe = op_data->oid.stripe | stripe_num,
|
.stripe = op_data->oid.stripe | stripe_num,
|
||||||
},
|
},
|
||||||
.version = op_version,
|
.version = op_version,
|
||||||
.offset = submit_type == SUBMIT_READ_BITMAPS ? 0
|
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
|
||||||
: (wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start),
|
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
|
||||||
.len = submit_type == SUBMIT_READ_BITMAPS ? 0
|
|
||||||
: (wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start),
|
|
||||||
.attr_len = wr ? clean_entry_bitmap_size : 0,
|
.attr_len = wr ? clean_entry_bitmap_size : 0,
|
||||||
};
|
};
|
||||||
#ifdef OSD_DEBUG
|
#ifdef OSD_DEBUG
|
||||||
|
@ -218,7 +214,7 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
|
||||||
subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
|
subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (submit_type != SUBMIT_READ_BITMAPS)
|
else
|
||||||
{
|
{
|
||||||
if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
|
if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
|
||||||
{
|
{
|
||||||
|
|
|
@ -44,25 +44,6 @@ void osd_t::secondary_op_callback(osd_op_t *op)
|
||||||
|
|
||||||
void osd_t::exec_secondary(osd_op_t *cur_op)
|
void osd_t::exec_secondary(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
|
|
||||||
{
|
|
||||||
int n = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id);
|
|
||||||
if (n > 0)
|
|
||||||
{
|
|
||||||
obj_ver_id *ov = (obj_ver_id*)cur_op->buf;
|
|
||||||
void *reply_buf = malloc_or_die(n * (8 + clean_entry_bitmap_size));
|
|
||||||
void *cur_buf = reply_buf;
|
|
||||||
for (int i = 0; i < n; i++)
|
|
||||||
{
|
|
||||||
bs->read_bitmap(ov[i].oid, ov[i].version, cur_buf + sizeof(uint64_t), (uint64_t*)cur_buf);
|
|
||||||
cur_buf += (8 + clean_entry_bitmap_size);
|
|
||||||
}
|
|
||||||
free(cur_op->buf);
|
|
||||||
cur_op->iov.push_back(cur_op->buf, cur_op->bs_op->retval * sizeof(obj_ver_id));
|
|
||||||
}
|
|
||||||
finish_op(cur_op, n * (8 + clean_entry_bitmap_size));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
cur_op->bs_op = new blockstore_op_t();
|
cur_op->bs_op = new blockstore_op_t();
|
||||||
cur_op->bs_op->callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
|
cur_op->bs_op->callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
|
||||||
cur_op->bs_op->opcode = (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ? BS_OP_READ
|
cur_op->bs_op->opcode = (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ? BS_OP_READ
|
||||||
|
|
Loading…
Reference in New Issue