diff --git a/blockstore.h b/blockstore.h index 96c47122..e965b5ca 100644 --- a/blockstore.h +++ b/blockstore.h @@ -19,19 +19,19 @@ #define MAX_BLOCK_SIZE 128*1024*1024 #define DISK_ALIGNMENT 512 -#define OP_READ 1 -#define OP_WRITE 2 -#define OP_SYNC 3 -#define OP_STABLE 4 -#define OP_DELETE 5 -#define OP_TYPE_MASK 0x7 +#define BS_OP_READ 1 +#define BS_OP_WRITE 2 +#define BS_OP_SYNC 3 +#define BS_OP_STABLE 4 +#define BS_OP_DELETE 5 +#define BS_OP_TYPE_MASK 0x7 #define BS_OP_PRIVATE_DATA_SIZE 256 struct blockstore_op_t { - // flags contain operation type and possibly other flags - uint64_t flags; + // operation + uint64_t opcode; // finish callback std::function callback; // For reads, writes & deletes: oid is the requested object diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index c9d412d7..84e60ab5 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -123,8 +123,8 @@ void blockstore_impl_t::loop() } else if (PRIV(op)->wait_for) { - if ((op->flags & OP_TYPE_MASK) == OP_WRITE || - (op->flags & OP_TYPE_MASK) == OP_DELETE) + if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE || + (op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE) { has_writes = 2; } @@ -134,12 +134,12 @@ void blockstore_impl_t::loop() unsigned ring_space = ringloop->space_left(); unsigned prev_sqe_pos = ringloop->save(); int dequeue_op = 0; - if ((op->flags & OP_TYPE_MASK) == OP_READ) + if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_READ) { dequeue_op = dequeue_read(op); } - else if ((op->flags & OP_TYPE_MASK) == OP_WRITE || - (op->flags & OP_TYPE_MASK) == OP_DELETE) + else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE || + (op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE) { if (has_writes == 2) { @@ -149,7 +149,7 @@ void blockstore_impl_t::loop() dequeue_op = dequeue_write(op); has_writes = dequeue_op ? 1 : 2; } - else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) + else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_SYNC) { // wait for all small writes to be submitted // wait for all big writes to complete, submit data device fsync @@ -162,7 +162,7 @@ void blockstore_impl_t::loop() } dequeue_op = dequeue_sync(op); } - else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) + else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_STABLE) { dequeue_op = dequeue_stable(op); } @@ -207,7 +207,7 @@ bool blockstore_impl_t::is_safe_to_stop() { // We should sync the blockstore before unmounting blockstore_op_t *op = new blockstore_op_t; - op->flags = OP_SYNC; + op->opcode = BS_OP_SYNC; op->buf = NULL; op->callback = [](blockstore_op_t *op) { @@ -279,10 +279,10 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) void blockstore_impl_t::enqueue_op(blockstore_op_t *op) { - int type = op->flags & OP_TYPE_MASK; - if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) && + int type = op->opcode & BS_OP_TYPE_MASK; + if (type < BS_OP_READ || type > BS_OP_DELETE || (type == BS_OP_READ || type == BS_OP_WRITE) && (op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)) || - readonly && type != OP_READ) + readonly && type != BS_OP_READ) { // Basic verification not passed op->retval = -EINVAL; @@ -295,7 +295,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op) PRIV(op)->sync_state = 0; PRIV(op)->pending_ops = 0; submit_queue.push_back(op); - if ((op->flags & OP_TYPE_MASK) == OP_WRITE) + if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE) { enqueue_write(op); } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 24ada2dc..9148797d 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -2,8 +2,8 @@ void blockstore_impl_t::enqueue_write(blockstore_op_t *op) { - // Assign version number - bool found = false, deleted = false, is_del = (op->flags & OP_TYPE_MASK) == OP_DELETE; + // Check or assign version number + bool found = false, deleted = false, is_del = (op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE; if (dirty_db.size() > 0) { auto dirty_it = dirty_db.upper_bound((obj_ver_id){ diff --git a/fio_engine.cpp b/fio_engine.cpp index 2ce79620..1833469d 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -192,12 +192,13 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) switch (io->ddir) { case DDIR_READ: - op->flags = OP_READ; + op->opcode = BS_OP_READ; op->buf = io->xfer_buf; op->oid = { .inode = 1, .stripe = io->offset >> bsd->bs->get_block_order(), }; + op->version = UINT64_MAX; // last unstable op->offset = io->offset % bsd->bs->get_block_size(); op->len = io->xfer_buflen; op->callback = [io, n](blockstore_op_t *op) @@ -213,12 +214,13 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) }; break; case DDIR_WRITE: - op->flags = OP_WRITE; + op->opcode = BS_OP_WRITE; op->buf = io->xfer_buf; op->oid = { .inode = 1, .stripe = io->offset >> bsd->bs->get_block_order(), }; + op->version = 0; // assign automatically op->offset = io->offset % bsd->bs->get_block_size(); op->len = io->xfer_buflen; op->callback = [io, n](blockstore_op_t *op) @@ -234,14 +236,14 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) }; break; case DDIR_SYNC: - op->flags = OP_SYNC; + op->opcode = BS_OP_SYNC; op->callback = [io, n](blockstore_op_t *op) { bs_data *bsd = (bs_data*)io->engine_data; auto & unstable_writes = bsd->bs->get_unstable_writes(); if (op->retval >= 0 && unstable_writes.size() > 0) { - op->flags = OP_STABLE; + op->opcode = BS_OP_STABLE; op->len = unstable_writes.size(); obj_ver_id *vers = new obj_ver_id[op->len]; op->buf = vers; @@ -287,7 +289,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) } #ifdef BLOCKSTORE_DEBUG - printf("+++ %s %llx n=%d\n", op->flags == OP_READ ? "OP_READ" : (op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC"), io, n); + printf("+++ %s %llx n=%d\n", op->opcode == OP_READ ? "OP_READ" : (op->opcode == OP_WRITE ? "OP_WRITE" : "OP_SYNC"), io, n); #endif io->error = 0; bsd->inflight++; diff --git a/osd.cpp b/osd.cpp index 0d54b203..50688be3 100644 --- a/osd.cpp +++ b/osd.cpp @@ -312,7 +312,7 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) } } -void osd_t::blockstore_op_callback(osd_op_t *cur_op) +void osd_t::secondary_op_callback(osd_op_t *cur_op) { auto cl_it = clients.find(cur_op->peer_fd); if (cl_it != clients.end()) @@ -341,7 +341,7 @@ void osd_t::enqueue_op(osd_op_t *cur_op) { // Bad command cur_op->bs_op.retval = -EINVAL; - blockstore_op_callback(cur_op); + secondary_op_callback(cur_op); return; } if (cur_op->op.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL) @@ -352,16 +352,16 @@ void osd_t::enqueue_op(osd_op_t *cur_op) if (!allow_test_ops) { cur_op->bs_op.retval = -EINVAL; - blockstore_op_callback(cur_op); + secondary_op_callback(cur_op); return; } - cur_op->bs_op.flags = OP_SYNC; + cur_op->bs_op.opcode = BS_OP_SYNC; cur_op->bs_op.callback = [this, cur_op](blockstore_op_t *op) { auto & unstable_writes = bs->get_unstable_writes(); if (op->retval >= 0 && unstable_writes.size() > 0) { - op->flags = OP_STABLE; + op->opcode = BS_OP_STABLE; op->len = unstable_writes.size(); obj_ver_id *vers = new obj_ver_id[op->len]; op->buf = vers; @@ -376,7 +376,7 @@ void osd_t::enqueue_op(osd_op_t *cur_op) unstable_writes.clear(); op->callback = [this, cur_op](blockstore_op_t *op) { - blockstore_op_callback(cur_op); + secondary_op_callback(cur_op); obj_ver_id *vers = (obj_ver_id*)op->buf; delete[] vers; }; @@ -384,19 +384,19 @@ void osd_t::enqueue_op(osd_op_t *cur_op) } else { - blockstore_op_callback(cur_op); + secondary_op_callback(cur_op); } }; bs->enqueue_op(&cur_op->bs_op); return; } // FIXME: LIST is not a blockstore op yet - cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { blockstore_op_callback(cur_op); }; - cur_op->bs_op.flags = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ - : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE - : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC - : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? OP_STABLE - : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE ? OP_DELETE + cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); }; + cur_op->bs_op.opcode = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? BS_OP_READ + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? BS_OP_WRITE + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? BS_OP_SYNC + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? BS_OP_STABLE + : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE ? BS_OP_DELETE : -1))))); if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) diff --git a/osd.h b/osd.h index 76acd754..95a908c6 100644 --- a/osd.h +++ b/osd.h @@ -103,7 +103,7 @@ class osd_t void send_replies(); void make_reply(osd_op_t *op); void handle_send(ring_data_t *data, int peer_fd); - void blockstore_op_callback(osd_op_t *cur_op); + void secondary_op_callback(osd_op_t *cur_op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); diff --git a/test_blockstore.cpp b/test_blockstore.cpp index 66a57d38..7ce29741 100644 --- a/test_blockstore.cpp +++ b/test_blockstore.cpp @@ -36,7 +36,7 @@ int main(int narg, char *args[]) if (bs->is_started()) { printf("init completed\n"); - op.flags = OP_WRITE; + op.opcode = BS_OP_WRITE; op.oid = { .inode = 1, .stripe = 0 }; op.version = 0; op.offset = 4096; @@ -51,14 +51,14 @@ int main(int narg, char *args[]) { printf("version %lu written, syncing\n", op.version); version = op.version; - op.flags = OP_SYNC; + op.opcode = BS_OP_SYNC; bs->enqueue_op(&op); main_state = 3; } else if (main_state == 4) { printf("stabilizing version %lu\n", version); - op.flags = OP_STABLE; + op.opcode = BS_OP_STABLE; op.len = 1; *((obj_ver_id*)op.buf) = { .oid = { .inode = 1, .stripe = 0 },