diff --git a/blockstore.cpp b/blockstore.cpp index 2f4c5ab05..891ff23d0 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -54,3 +54,8 @@ uint64_t blockstore_t::get_block_count() { return impl->get_block_count(); } + +uint32_t blockstore_t::get_disk_alignment() +{ + return impl->get_disk_alignment(); +} diff --git a/blockstore.h b/blockstore.h index 6377e00c0..2872ab45f 100644 --- a/blockstore.h +++ b/blockstore.h @@ -13,6 +13,10 @@ #include "object_id.h" #include "ringloop.h" +// Memory alignment for direct I/O (usually 512 bytes) +// All other alignments must be a multiple of this one +#define MEM_ALIGNMENT 512 + // Default block size is 128 KB, current allowed range is 4K - 128M #define DEFAULT_ORDER 17 #define MIN_BLOCK_SIZE 4*1024 @@ -100,6 +104,9 @@ public: // Unstable writes are added here (map of object_id -> version) std::map & get_unstable_writes(); + // FIXME rename to object_size uint32_t get_block_size(); uint64_t get_block_count(); + + uint32_t get_disk_alignment(); }; diff --git a/blockstore_impl.h b/blockstore_impl.h index b9cd175dd..718100622 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -22,10 +22,6 @@ //#define BLOCKSTORE_DEBUG -// Memory alignment for direct I/O (usually 512 bytes) -// All other alignments must be a multiple of this one -#define MEM_ALIGNMENT 512 - // States are not stored on disk. Instead, they're deduced from the journal // FIXME: Rename to BS_ST_* @@ -183,7 +179,7 @@ class blockstore_impl_t uint64_t data_offset; uint64_t cfg_journal_size; // Required write alignment and journal/metadata/data areas' location alignment - uint64_t disk_alignment = 512; + uint32_t disk_alignment = 512; // Journal block size - minimum_io_size of the journal device is the best choice uint64_t journal_block_size = 512; // Metadata block size - minimum_io_size of the metadata device is the best choice @@ -317,4 +313,5 @@ public: inline uint32_t get_block_size() { return block_size; } inline uint64_t get_block_count() { return block_count; } + inline uint32_t get_disk_alignment() { return disk_alignment; } }; diff --git a/osd.cpp b/osd.cpp index afbddcccc..b4099bfac 100644 --- a/osd.cpp +++ b/osd.cpp @@ -12,6 +12,9 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo this->config = config; this->bs = bs; this->ringloop = ringloop; + this->bs_block_size = bs->get_block_size(); + // FIXME: use bitmap granularity instead + this->bs_disk_alignment = bs->get_disk_alignment(); bind_address = config["bind_address"]; if (bind_address == "") @@ -278,9 +281,13 @@ void osd_t::exec_op(osd_op_t *cur_op) { exec_show_config(cur_op); } - else if (cur_op->op.hdr.opcode == OSD_OP_READ || cur_op->op.hdr.opcode == OSD_OP_WRITE) + else if (cur_op->op.hdr.opcode == OSD_OP_READ) { - exec_primary(cur_op); + exec_primary_read(cur_op); + } + else if (cur_op->op.hdr.opcode == OSD_OP_WRITE) + { + exec_primary_write(cur_op); } else { diff --git a/osd.h b/osd.h index 297809f70..0dae0bfe8 100644 --- a/osd.h +++ b/osd.h @@ -128,6 +128,7 @@ class osd_t bool stopping = false; int inflight_ops = 0; blockstore_t *bs; + uint32_t bs_block_size, bs_disk_alignment; ring_loop_t *ringloop; int wait_state = 0; @@ -172,7 +173,9 @@ class osd_t void secondary_op_callback(osd_op_t *cur_op); // primary ops - void exec_primary(osd_op_t *cur_op); + void exec_primary_read(osd_op_t *cur_op); + void exec_primary_write(osd_op_t *cur_op); + void exec_primary_sync(osd_op_t *cur_op); void make_primary_reply(osd_op_t *op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); diff --git a/osd_exec_secondary.cpp b/osd_exec_secondary.cpp index 4d6f0936c..b48cfccc9 100644 --- a/osd_exec_secondary.cpp +++ b/osd_exec_secondary.cpp @@ -51,8 +51,15 @@ void osd_t::exec_secondary(osd_op_t *cur_op) } else if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) { + if (cur_op->op.sec_list.pgtotal < cur_op->op.sec_list.pgnum) + { + // requested pg number is greater than total pg count + cur_op->bs_op.retval = -EINVAL; + secondary_op_callback(cur_op); + return; + } cur_op->bs_op.len = cur_op->op.sec_list.pgtotal; - cur_op->bs_op.offset = cur_op->op.sec_list.pgnum; + cur_op->bs_op.offset = cur_op->op.sec_list.pgnum - 1; } #ifdef OSD_STUB cur_op->bs_op.retval = cur_op->bs_op.len; @@ -68,11 +75,8 @@ void osd_t::exec_show_config(osd_op_t *cur_op) std::string *cfg_str = new std::string(std::move(json11::Json(config).dump())); cur_op->buf = cfg_str; auto & cl = clients[cur_op->peer_fd]; - cl.write_state = CL_WRITE_READY; - write_ready_clients.push_back(cur_op->peer_fd); make_reply(cur_op); - cl.outbox.push_back(cur_op); - ringloop->wakeup(); + outbox_push(cl, cur_op); } void osd_t::exec_sync_stab_all(osd_op_t *cur_op) diff --git a/osd_ops.h b/osd_ops.h index 815fa6a89..936eb2f9a 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -20,7 +20,8 @@ #define OSD_OP_SHOW_CONFIG 9 #define OSD_OP_READ 10 #define OSD_OP_WRITE 11 -#define OSD_OP_MAX 11 +#define OSD_OP_SYNC 12 +#define OSD_OP_MAX 12 // Alignment & limit for read/write operations #define OSD_RW_ALIGN 512 #define OSD_RW_MAX 64*1024*1024 @@ -155,6 +156,17 @@ struct __attribute__((__packed__)) osd_reply_rw_t osd_reply_header_t header; }; +// sync to the primary OSD +struct __attribute__((__packed__)) osd_op_sync_t +{ + osd_op_header_t header; +}; + +struct __attribute__((__packed__)) osd_reply_sync_t +{ + osd_reply_header_t header; +}; + union osd_any_op_t { osd_op_header_t hdr; @@ -165,6 +177,7 @@ union osd_any_op_t osd_op_secondary_list_t sec_list; osd_op_show_config_t show_conf; osd_op_rw_t rw; + osd_op_sync_t sync; }; union osd_any_reply_t @@ -177,4 +190,5 @@ union osd_any_reply_t osd_reply_secondary_list_t sec_list; osd_reply_show_config_t show_conf; osd_reply_rw_t rw; + osd_reply_sync_t sync; }; diff --git a/osd_peering.cpp b/osd_peering.cpp index a160ccdcb..e12a9a4a4 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -223,8 +223,8 @@ void osd_t::start_pg_peering(int pg_idx) .id = 1, .opcode = OSD_OP_SECONDARY_LIST, }, - .pgnum = 1, - .pgtotal = 1, + .pgnum = pg.pg_num, + .pgtotal = pg_count, }, }; op->callback = [ps, osd_num](osd_op_t *op) diff --git a/osd_peering_pg.h b/osd_peering_pg.h index cad21d0b3..4602b9812 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -101,8 +101,7 @@ struct pg_t uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2; uint64_t pg_num; uint64_t clean_count = 0; - // target_set = (role => osd_num). role numbers start with zero - // when PG is degraded, target_set only includes 2 OSDs + // target_set = (role => osd_num or UINT64_MAX if missing). role numbers start with zero std::vector target_set; // moved object map. by default, each object is considered to reside on the target_set. // this map stores all objects that differ. diff --git a/osd_primary.cpp b/osd_primary.cpp index 13eacbe5d..c65b857c6 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -1,6 +1,6 @@ #include "osd.h" -void osd_t::exec_primary(osd_op_t *cur_op) +void osd_t::exec_primary_read(osd_op_t *cur_op) { // read: read directly or read paired stripe(s), reconstruct, return // write: read paired stripe(s), modify, write @@ -10,6 +10,84 @@ void osd_t::exec_primary(osd_op_t *cur_op) // and... postpone other write requests to the same stripe until the completion of previous ones // // sync: sync peers, get unstable versions from somewhere, stabilize them + object_id oid = { + .inode = cur_op->op.rw.inode, + .stripe = (cur_op->op.rw.offset / (bs_block_size*2)) << STRIPE_SHIFT, + }; + uint64_t start = cur_op->op.rw.offset, end = cur_op->op.rw.offset + cur_op->op.rw.len; + unsigned pg_num = (oid % pg_count); // FIXME +1 + if (((end - 1) / (bs_block_size*2)) != oid.stripe || + (start % bs_disk_alignment) || (end % bs_disk_alignment) || + pg_num > pgs.size()) + { + // FIXME add separate magics + cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + cur_op->reply.hdr.id = cur_op->op.hdr.id; + cur_op->reply.hdr.opcode = cur_op->op.hdr.opcode; + cur_op->reply.hdr.retval = -EINVAL; + outbox_push(clients[cur_op->peer_fd], cur_op); + return; + } + // role -> start, end + void *buf = memalign(MEM_ALIGNMENT, cur_op->op.rw.len); + uint64_t reads[pgs[pg_num].pg_minsize*2] = { 0 }; + for (int role = 0; role < pgs[pg_num].pg_minsize; role++) + { + if (start < (1+role)*bs_block_size && end > role*bs_block_size) + { + reads[role*2] = start < role*bs_block_size ? 0 : start-role*bs_block_size; + reads[role*2+1] = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size; + } + } + if (pgs[pg_num].pg_cursize == 3) + { + + } + else + { + // PG is degraded + uint64_t real_reads[pgs[pg_num].pg_size*2] = { 0 }; + memcpy(real_reads, reads, sizeof(uint64_t)*pgs[pg_num].pg_minsize*2); + for (int role = 0; role < pgs[pg_num].pg_minsize; role++) + { + if (reads[role*2+1] != 0 && pgs[pg_num].target_set[role] == UINT64_MAX) + { + // Stripe is missing. Extend read to other stripes. + // We need at least pg_minsize stripes to recover the lost part. + int exist = 0; + for (int j = 0; j < pgs[pg_num].pg_size; j++) + { + if (pgs[pg_num].target_set[j] != UINT64_MAX) + { + if (real_reads[j*2+1] == 0 || j >= pgs[pg_num].pg_minsize) + { + real_reads[j*2] = reads[role*2]; + real_reads[j*2+1] = reads[role*2+1]; + } + else + { + real_reads[j*2] = reads[j*2] < reads[role*2] ? reads[j*2] : reads[role*2]; + real_reads[j*2] = reads[j*2+1] > reads[role*2+1] ? reads[j*2+1] : reads[role*2+1]; + } + exist++; + if (exist >= pgs[pg_num].pg_minsize) + { + break; + } + } + } + } + } + } +} + +void osd_t::exec_primary_write(osd_op_t *cur_op) +{ + +} + +void osd_t::exec_primary_sync(osd_op_t *cur_op) +{ }