Add "read bitmaps" operation to secondary OSD protocol

opt-read
Vitaliy Filippov 2021-02-13 23:38:51 +03:00
parent 15636dd3a2
commit be49998c89
6 changed files with 49 additions and 1 deletions

View File

@ -232,6 +232,15 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
}
cl->read_remaining = cur_op->req.sec_stab.len;
}
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
if (cur_op->req.sec_read_bmp.len > 0)
{
cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_read_bmp.len);
cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_read_bmp.len);
}
cl->read_remaining = cur_op->req.sec_read_bmp.len;
}
else if (cur_op->req.hdr.opcode == OSD_OP_READ)
{
cl->read_remaining = 0;

View File

@ -73,6 +73,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
? (cur_op->req.hdr.opcode == OSD_OP_READ ||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cur_op->req.hdr.opcode == OSD_OP_SEC_LIST ||
cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP ||
cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)
: (cur_op->req.hdr.opcode == OSD_OP_WRITE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||

View File

@ -228,6 +228,7 @@ void osd_t::exec_op(osd_op_t *cur_op)
cur_op->req.hdr.opcode != OSD_OP_SEC_READ &&
cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
cur_op->req.hdr.opcode != OSD_OP_READ &&
cur_op->req.hdr.opcode != OSD_OP_SEC_READ_BMP &&
cur_op->req.hdr.opcode != OSD_OP_SHOW_CONFIG)
{
// Readonly mode

View File

@ -20,4 +20,5 @@ const char* osd_op_names[] = {
"primary_sync",
"primary_delete",
"ping",
"sec_read_bmp",
};

View File

@ -28,7 +28,8 @@
#define OSD_OP_SYNC 13
#define OSD_OP_DELETE 14
#define OSD_OP_PING 15
#define OSD_OP_MAX 15
#define OSD_OP_SEC_READ_BMP 16
#define OSD_OP_MAX 16
// Alignment & limit for read/write operations
#ifndef MEM_ALIGNMENT
#define MEM_ALIGNMENT 512
@ -128,6 +129,20 @@ struct __attribute__((__packed__)) osd_reply_sec_stab_t
};
typedef osd_reply_sec_stab_t osd_reply_sec_rollback_t;
// bulk read bitmaps from a secondary OSD
struct __attribute__((__packed__)) osd_op_sec_read_bmp_t
{
osd_op_header_t header;
// obj_ver_id array length in bytes
uint64_t len;
};
struct __attribute__((__packed__)) osd_reply_sec_read_bmp_t
{
// retval is payload length in bytes. payload is {version,bitmap}[]
osd_reply_header_t header;
};
// show configuration
struct __attribute__((__packed__)) osd_op_show_config_t
{
@ -198,6 +213,7 @@ union osd_any_op_t
osd_op_sec_del_t sec_del;
osd_op_sec_sync_t sec_sync;
osd_op_sec_stab_t sec_stab;
osd_op_sec_read_bmp_t sec_read_bmp;
osd_op_sec_list_t sec_list;
osd_op_show_config_t show_conf;
osd_op_rw_t rw;
@ -212,6 +228,7 @@ union osd_any_reply_t
osd_reply_sec_del_t sec_del;
osd_reply_sec_sync_t sec_sync;
osd_reply_sec_stab_t sec_stab;
osd_reply_sec_read_bmp_t sec_read_bmp;
osd_reply_sec_list_t sec_list;
osd_reply_show_config_t show_conf;
osd_reply_rw_t rw;

View File

@ -44,6 +44,25 @@ void osd_t::secondary_op_callback(osd_op_t *op)
void osd_t::exec_secondary(osd_op_t *cur_op)
{
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
int n = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id);
if (n > 0)
{
obj_ver_id *ov = (obj_ver_id*)cur_op->buf;
void *reply_buf = malloc_or_die(n * (8 + clean_entry_bitmap_size));
void *cur_buf = reply_buf;
for (int i = 0; i < n; i++)
{
bs->read_bitmap(ov[i].oid, ov[i].version, cur_buf + sizeof(uint64_t), (uint64_t*)cur_buf);
cur_buf += (8 + clean_entry_bitmap_size);
}
free(cur_op->buf);
cur_op->iov.push_back(cur_op->buf, cur_op->bs_op->retval * sizeof(obj_ver_id));
}
finish_op(cur_op, n * (8 + clean_entry_bitmap_size));
return;
}
cur_op->bs_op = new blockstore_op_t();
cur_op->bs_op->callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
cur_op->bs_op->opcode = (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ? BS_OP_READ