forked from vitalif/vitastor
WIP primary OSD read
parent
f4707709c6
commit
ec50957c41
|
@ -54,3 +54,8 @@ uint64_t blockstore_t::get_block_count()
|
||||||
{
|
{
|
||||||
return impl->get_block_count();
|
return impl->get_block_count();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t blockstore_t::get_disk_alignment()
|
||||||
|
{
|
||||||
|
return impl->get_disk_alignment();
|
||||||
|
}
|
||||||
|
|
|
@ -13,6 +13,10 @@
|
||||||
#include "object_id.h"
|
#include "object_id.h"
|
||||||
#include "ringloop.h"
|
#include "ringloop.h"
|
||||||
|
|
||||||
|
// Memory alignment for direct I/O (usually 512 bytes)
|
||||||
|
// All other alignments must be a multiple of this one
|
||||||
|
#define MEM_ALIGNMENT 512
|
||||||
|
|
||||||
// Default block size is 128 KB, current allowed range is 4K - 128M
|
// Default block size is 128 KB, current allowed range is 4K - 128M
|
||||||
#define DEFAULT_ORDER 17
|
#define DEFAULT_ORDER 17
|
||||||
#define MIN_BLOCK_SIZE 4*1024
|
#define MIN_BLOCK_SIZE 4*1024
|
||||||
|
@ -100,6 +104,9 @@ public:
|
||||||
// Unstable writes are added here (map of object_id -> version)
|
// Unstable writes are added here (map of object_id -> version)
|
||||||
std::map<object_id, uint64_t> & get_unstable_writes();
|
std::map<object_id, uint64_t> & get_unstable_writes();
|
||||||
|
|
||||||
|
// FIXME rename to object_size
|
||||||
uint32_t get_block_size();
|
uint32_t get_block_size();
|
||||||
uint64_t get_block_count();
|
uint64_t get_block_count();
|
||||||
|
|
||||||
|
uint32_t get_disk_alignment();
|
||||||
};
|
};
|
||||||
|
|
|
@ -22,10 +22,6 @@
|
||||||
|
|
||||||
//#define BLOCKSTORE_DEBUG
|
//#define BLOCKSTORE_DEBUG
|
||||||
|
|
||||||
// Memory alignment for direct I/O (usually 512 bytes)
|
|
||||||
// All other alignments must be a multiple of this one
|
|
||||||
#define MEM_ALIGNMENT 512
|
|
||||||
|
|
||||||
// States are not stored on disk. Instead, they're deduced from the journal
|
// States are not stored on disk. Instead, they're deduced from the journal
|
||||||
// FIXME: Rename to BS_ST_*
|
// FIXME: Rename to BS_ST_*
|
||||||
|
|
||||||
|
@ -183,7 +179,7 @@ class blockstore_impl_t
|
||||||
uint64_t data_offset;
|
uint64_t data_offset;
|
||||||
uint64_t cfg_journal_size;
|
uint64_t cfg_journal_size;
|
||||||
// Required write alignment and journal/metadata/data areas' location alignment
|
// Required write alignment and journal/metadata/data areas' location alignment
|
||||||
uint64_t disk_alignment = 512;
|
uint32_t disk_alignment = 512;
|
||||||
// Journal block size - minimum_io_size of the journal device is the best choice
|
// Journal block size - minimum_io_size of the journal device is the best choice
|
||||||
uint64_t journal_block_size = 512;
|
uint64_t journal_block_size = 512;
|
||||||
// Metadata block size - minimum_io_size of the metadata device is the best choice
|
// Metadata block size - minimum_io_size of the metadata device is the best choice
|
||||||
|
@ -317,4 +313,5 @@ public:
|
||||||
|
|
||||||
inline uint32_t get_block_size() { return block_size; }
|
inline uint32_t get_block_size() { return block_size; }
|
||||||
inline uint64_t get_block_count() { return block_count; }
|
inline uint64_t get_block_count() { return block_count; }
|
||||||
|
inline uint32_t get_disk_alignment() { return disk_alignment; }
|
||||||
};
|
};
|
||||||
|
|
11
osd.cpp
11
osd.cpp
|
@ -12,6 +12,9 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||||
this->config = config;
|
this->config = config;
|
||||||
this->bs = bs;
|
this->bs = bs;
|
||||||
this->ringloop = ringloop;
|
this->ringloop = ringloop;
|
||||||
|
this->bs_block_size = bs->get_block_size();
|
||||||
|
// FIXME: use bitmap granularity instead
|
||||||
|
this->bs_disk_alignment = bs->get_disk_alignment();
|
||||||
|
|
||||||
bind_address = config["bind_address"];
|
bind_address = config["bind_address"];
|
||||||
if (bind_address == "")
|
if (bind_address == "")
|
||||||
|
@ -278,9 +281,13 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
exec_show_config(cur_op);
|
exec_show_config(cur_op);
|
||||||
}
|
}
|
||||||
else if (cur_op->op.hdr.opcode == OSD_OP_READ || cur_op->op.hdr.opcode == OSD_OP_WRITE)
|
else if (cur_op->op.hdr.opcode == OSD_OP_READ)
|
||||||
{
|
{
|
||||||
exec_primary(cur_op);
|
exec_primary_read(cur_op);
|
||||||
|
}
|
||||||
|
else if (cur_op->op.hdr.opcode == OSD_OP_WRITE)
|
||||||
|
{
|
||||||
|
exec_primary_write(cur_op);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
5
osd.h
5
osd.h
|
@ -128,6 +128,7 @@ class osd_t
|
||||||
bool stopping = false;
|
bool stopping = false;
|
||||||
int inflight_ops = 0;
|
int inflight_ops = 0;
|
||||||
blockstore_t *bs;
|
blockstore_t *bs;
|
||||||
|
uint32_t bs_block_size, bs_disk_alignment;
|
||||||
ring_loop_t *ringloop;
|
ring_loop_t *ringloop;
|
||||||
|
|
||||||
int wait_state = 0;
|
int wait_state = 0;
|
||||||
|
@ -172,7 +173,9 @@ class osd_t
|
||||||
void secondary_op_callback(osd_op_t *cur_op);
|
void secondary_op_callback(osd_op_t *cur_op);
|
||||||
|
|
||||||
// primary ops
|
// primary ops
|
||||||
void exec_primary(osd_op_t *cur_op);
|
void exec_primary_read(osd_op_t *cur_op);
|
||||||
|
void exec_primary_write(osd_op_t *cur_op);
|
||||||
|
void exec_primary_sync(osd_op_t *cur_op);
|
||||||
void make_primary_reply(osd_op_t *op);
|
void make_primary_reply(osd_op_t *op);
|
||||||
public:
|
public:
|
||||||
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
||||||
|
|
|
@ -51,8 +51,15 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
|
||||||
}
|
}
|
||||||
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST)
|
else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST)
|
||||||
{
|
{
|
||||||
|
if (cur_op->op.sec_list.pgtotal < cur_op->op.sec_list.pgnum)
|
||||||
|
{
|
||||||
|
// requested pg number is greater than total pg count
|
||||||
|
cur_op->bs_op.retval = -EINVAL;
|
||||||
|
secondary_op_callback(cur_op);
|
||||||
|
return;
|
||||||
|
}
|
||||||
cur_op->bs_op.len = cur_op->op.sec_list.pgtotal;
|
cur_op->bs_op.len = cur_op->op.sec_list.pgtotal;
|
||||||
cur_op->bs_op.offset = cur_op->op.sec_list.pgnum;
|
cur_op->bs_op.offset = cur_op->op.sec_list.pgnum - 1;
|
||||||
}
|
}
|
||||||
#ifdef OSD_STUB
|
#ifdef OSD_STUB
|
||||||
cur_op->bs_op.retval = cur_op->bs_op.len;
|
cur_op->bs_op.retval = cur_op->bs_op.len;
|
||||||
|
@ -68,11 +75,8 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
||||||
std::string *cfg_str = new std::string(std::move(json11::Json(config).dump()));
|
std::string *cfg_str = new std::string(std::move(json11::Json(config).dump()));
|
||||||
cur_op->buf = cfg_str;
|
cur_op->buf = cfg_str;
|
||||||
auto & cl = clients[cur_op->peer_fd];
|
auto & cl = clients[cur_op->peer_fd];
|
||||||
cl.write_state = CL_WRITE_READY;
|
|
||||||
write_ready_clients.push_back(cur_op->peer_fd);
|
|
||||||
make_reply(cur_op);
|
make_reply(cur_op);
|
||||||
cl.outbox.push_back(cur_op);
|
outbox_push(cl, cur_op);
|
||||||
ringloop->wakeup();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::exec_sync_stab_all(osd_op_t *cur_op)
|
void osd_t::exec_sync_stab_all(osd_op_t *cur_op)
|
||||||
|
|
16
osd_ops.h
16
osd_ops.h
|
@ -20,7 +20,8 @@
|
||||||
#define OSD_OP_SHOW_CONFIG 9
|
#define OSD_OP_SHOW_CONFIG 9
|
||||||
#define OSD_OP_READ 10
|
#define OSD_OP_READ 10
|
||||||
#define OSD_OP_WRITE 11
|
#define OSD_OP_WRITE 11
|
||||||
#define OSD_OP_MAX 11
|
#define OSD_OP_SYNC 12
|
||||||
|
#define OSD_OP_MAX 12
|
||||||
// Alignment & limit for read/write operations
|
// Alignment & limit for read/write operations
|
||||||
#define OSD_RW_ALIGN 512
|
#define OSD_RW_ALIGN 512
|
||||||
#define OSD_RW_MAX 64*1024*1024
|
#define OSD_RW_MAX 64*1024*1024
|
||||||
|
@ -155,6 +156,17 @@ struct __attribute__((__packed__)) osd_reply_rw_t
|
||||||
osd_reply_header_t header;
|
osd_reply_header_t header;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// sync to the primary OSD
|
||||||
|
struct __attribute__((__packed__)) osd_op_sync_t
|
||||||
|
{
|
||||||
|
osd_op_header_t header;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((__packed__)) osd_reply_sync_t
|
||||||
|
{
|
||||||
|
osd_reply_header_t header;
|
||||||
|
};
|
||||||
|
|
||||||
union osd_any_op_t
|
union osd_any_op_t
|
||||||
{
|
{
|
||||||
osd_op_header_t hdr;
|
osd_op_header_t hdr;
|
||||||
|
@ -165,6 +177,7 @@ union osd_any_op_t
|
||||||
osd_op_secondary_list_t sec_list;
|
osd_op_secondary_list_t sec_list;
|
||||||
osd_op_show_config_t show_conf;
|
osd_op_show_config_t show_conf;
|
||||||
osd_op_rw_t rw;
|
osd_op_rw_t rw;
|
||||||
|
osd_op_sync_t sync;
|
||||||
};
|
};
|
||||||
|
|
||||||
union osd_any_reply_t
|
union osd_any_reply_t
|
||||||
|
@ -177,4 +190,5 @@ union osd_any_reply_t
|
||||||
osd_reply_secondary_list_t sec_list;
|
osd_reply_secondary_list_t sec_list;
|
||||||
osd_reply_show_config_t show_conf;
|
osd_reply_show_config_t show_conf;
|
||||||
osd_reply_rw_t rw;
|
osd_reply_rw_t rw;
|
||||||
|
osd_reply_sync_t sync;
|
||||||
};
|
};
|
||||||
|
|
|
@ -223,8 +223,8 @@ void osd_t::start_pg_peering(int pg_idx)
|
||||||
.id = 1,
|
.id = 1,
|
||||||
.opcode = OSD_OP_SECONDARY_LIST,
|
.opcode = OSD_OP_SECONDARY_LIST,
|
||||||
},
|
},
|
||||||
.pgnum = 1,
|
.pgnum = pg.pg_num,
|
||||||
.pgtotal = 1,
|
.pgtotal = pg_count,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
op->callback = [ps, osd_num](osd_op_t *op)
|
op->callback = [ps, osd_num](osd_op_t *op)
|
||||||
|
|
|
@ -101,8 +101,7 @@ struct pg_t
|
||||||
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
|
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
|
||||||
uint64_t pg_num;
|
uint64_t pg_num;
|
||||||
uint64_t clean_count = 0;
|
uint64_t clean_count = 0;
|
||||||
// target_set = (role => osd_num). role numbers start with zero
|
// target_set = (role => osd_num or UINT64_MAX if missing). role numbers start with zero
|
||||||
// when PG is degraded, target_set only includes 2 OSDs
|
|
||||||
std::vector<uint64_t> target_set;
|
std::vector<uint64_t> target_set;
|
||||||
// moved object map. by default, each object is considered to reside on the target_set.
|
// moved object map. by default, each object is considered to reside on the target_set.
|
||||||
// this map stores all objects that differ.
|
// this map stores all objects that differ.
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
#include "osd.h"
|
#include "osd.h"
|
||||||
|
|
||||||
void osd_t::exec_primary(osd_op_t *cur_op)
|
void osd_t::exec_primary_read(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
// read: read directly or read paired stripe(s), reconstruct, return
|
// read: read directly or read paired stripe(s), reconstruct, return
|
||||||
// write: read paired stripe(s), modify, write
|
// write: read paired stripe(s), modify, write
|
||||||
|
@ -10,6 +10,84 @@ void osd_t::exec_primary(osd_op_t *cur_op)
|
||||||
// and... postpone other write requests to the same stripe until the completion of previous ones
|
// and... postpone other write requests to the same stripe until the completion of previous ones
|
||||||
//
|
//
|
||||||
// sync: sync peers, get unstable versions from somewhere, stabilize them
|
// sync: sync peers, get unstable versions from somewhere, stabilize them
|
||||||
|
object_id oid = {
|
||||||
|
.inode = cur_op->op.rw.inode,
|
||||||
|
.stripe = (cur_op->op.rw.offset / (bs_block_size*2)) << STRIPE_SHIFT,
|
||||||
|
};
|
||||||
|
uint64_t start = cur_op->op.rw.offset, end = cur_op->op.rw.offset + cur_op->op.rw.len;
|
||||||
|
unsigned pg_num = (oid % pg_count); // FIXME +1
|
||||||
|
if (((end - 1) / (bs_block_size*2)) != oid.stripe ||
|
||||||
|
(start % bs_disk_alignment) || (end % bs_disk_alignment) ||
|
||||||
|
pg_num > pgs.size())
|
||||||
|
{
|
||||||
|
// FIXME add separate magics
|
||||||
|
cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
||||||
|
cur_op->reply.hdr.id = cur_op->op.hdr.id;
|
||||||
|
cur_op->reply.hdr.opcode = cur_op->op.hdr.opcode;
|
||||||
|
cur_op->reply.hdr.retval = -EINVAL;
|
||||||
|
outbox_push(clients[cur_op->peer_fd], cur_op);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// role -> start, end
|
||||||
|
void *buf = memalign(MEM_ALIGNMENT, cur_op->op.rw.len);
|
||||||
|
uint64_t reads[pgs[pg_num].pg_minsize*2] = { 0 };
|
||||||
|
for (int role = 0; role < pgs[pg_num].pg_minsize; role++)
|
||||||
|
{
|
||||||
|
if (start < (1+role)*bs_block_size && end > role*bs_block_size)
|
||||||
|
{
|
||||||
|
reads[role*2] = start < role*bs_block_size ? 0 : start-role*bs_block_size;
|
||||||
|
reads[role*2+1] = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pgs[pg_num].pg_cursize == 3)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// PG is degraded
|
||||||
|
uint64_t real_reads[pgs[pg_num].pg_size*2] = { 0 };
|
||||||
|
memcpy(real_reads, reads, sizeof(uint64_t)*pgs[pg_num].pg_minsize*2);
|
||||||
|
for (int role = 0; role < pgs[pg_num].pg_minsize; role++)
|
||||||
|
{
|
||||||
|
if (reads[role*2+1] != 0 && pgs[pg_num].target_set[role] == UINT64_MAX)
|
||||||
|
{
|
||||||
|
// Stripe is missing. Extend read to other stripes.
|
||||||
|
// We need at least pg_minsize stripes to recover the lost part.
|
||||||
|
int exist = 0;
|
||||||
|
for (int j = 0; j < pgs[pg_num].pg_size; j++)
|
||||||
|
{
|
||||||
|
if (pgs[pg_num].target_set[j] != UINT64_MAX)
|
||||||
|
{
|
||||||
|
if (real_reads[j*2+1] == 0 || j >= pgs[pg_num].pg_minsize)
|
||||||
|
{
|
||||||
|
real_reads[j*2] = reads[role*2];
|
||||||
|
real_reads[j*2+1] = reads[role*2+1];
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
real_reads[j*2] = reads[j*2] < reads[role*2] ? reads[j*2] : reads[role*2];
|
||||||
|
real_reads[j*2] = reads[j*2+1] > reads[role*2+1] ? reads[j*2+1] : reads[role*2+1];
|
||||||
|
}
|
||||||
|
exist++;
|
||||||
|
if (exist >= pgs[pg_num].pg_minsize)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void osd_t::exec_primary_write(osd_op_t *cur_op)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void osd_t::exec_primary_sync(osd_op_t *cur_op)
|
||||||
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue