From 3061b8cf51ff3a746a219bd8f6fa014e6e0b84b5 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 15 Dec 2019 15:30:51 +0300 Subject: [PATCH] Add sync&stabilize test operation --- osd.cpp | 88 ++++++++++++++++++++++++++++++++++++++++++------------- osd.h | 2 ++ osd_ops.h | 6 ++-- 3 files changed, 73 insertions(+), 23 deletions(-) diff --git a/osd.cpp b/osd.cpp index 0e7aa0cf..a03a28dd 100644 --- a/osd.cpp +++ b/osd.cpp @@ -307,27 +307,28 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) } } +void osd_t::blockstore_op_callback(osd_op_t *cur_op) +{ + auto cl_it = clients.find(cur_op->peer_fd); + if (cl_it != clients.end()) + { + auto & cl = cl_it->second; + if (cl.write_state == 0) + { + cl.write_state = CL_WRITE_READY; + write_ready_clients.push_back(cur_op->peer_fd); + } + cl.completions.push_back(cur_op); + ringloop->wakeup(); + } + else + { + delete cur_op; + } +} + void osd_t::enqueue_op(osd_op_t *cur_op) { - cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) - { - auto cl_it = clients.find(cur_op->peer_fd); - if (cl_it != clients.end()) - { - auto & cl = cl_it->second; - if (cl.write_state == 0) - { - cl.write_state = CL_WRITE_READY; - write_ready_clients.push_back(cur_op->peer_fd); - } - cl.completions.push_back(cur_op); - ringloop->wakeup(); - } - else - { - delete cur_op; - } - }; if (cur_op->op.hdr.magic != SECONDARY_OSD_OP_MAGIC || cur_op->op.hdr.opcode < OSD_OP_MIN || cur_op->op.hdr.opcode > OSD_OP_MAX || (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) && @@ -335,10 +336,57 @@ void osd_t::enqueue_op(osd_op_t *cur_op) { // Bad command cur_op->bs_op.retval = -EINVAL; - cur_op->bs_op.callback(&cur_op->bs_op); + blockstore_op_callback(cur_op); + return; + } + if (cur_op->op.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL) + { + // Sync and stabilize all objects + // This command is only valid for tests + // FIXME: Dedup between here & fio_engine + if (!allow_test_ops) + { + cur_op->bs_op.retval = -EINVAL; + blockstore_op_callback(cur_op); + return; + } + cur_op->bs_op.flags = OP_SYNC; + cur_op->bs_op.callback = [this, cur_op](blockstore_op_t *op) + { + auto & unstable_writes = bs->get_unstable_writes(); + if (op->retval >= 0 && unstable_writes.size() > 0) + { + op->flags = OP_STABLE; + op->len = unstable_writes.size(); + obj_ver_id *vers = new obj_ver_id[op->len]; + op->buf = vers; + int i = 0; + for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++) + { + vers[i] = { + .oid = it->first, + .version = it->second, + }; + } + unstable_writes.clear(); + op->callback = [this, cur_op](blockstore_op_t *op) + { + blockstore_op_callback(cur_op); + obj_ver_id *vers = (obj_ver_id*)op->buf; + delete[] vers; + }; + bs->enqueue_op(op); + } + else + { + blockstore_op_callback(cur_op); + } + }; + bs->enqueue_op(&cur_op->bs_op); return; } // FIXME: LIST is not a blockstore op yet + cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { blockstore_op_callback(cur_op); }; cur_op->bs_op.flags = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE : (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC diff --git a/osd.h b/osd.h index bcc12b11..d0788ba1 100644 --- a/osd.h +++ b/osd.h @@ -77,6 +77,7 @@ class osd_t std::string bind_address; int bind_port, listen_backlog; int client_queue_depth = 128; + bool allow_test_ops = true; // fields @@ -101,6 +102,7 @@ class osd_t void send_replies(); void make_reply(osd_op_t *op); void handle_send(ring_data_t *data, int peer_fd); + void blockstore_op_callback(osd_op_t *cur_op); public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); diff --git a/osd_ops.h b/osd_ops.h index fca97ad1..3f99f3d3 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -4,7 +4,6 @@ #include // Magic numbers - #define SECONDARY_OSD_OP_MAGIC 0x2bd7b10325434553l #define SECONDARY_OSD_REPLY_MAGIC 0xbaa699b87b434553l // Operation request headers and operation reply headers have fixed size after which comes data @@ -17,8 +16,9 @@ #define OSD_OP_SECONDARY_SYNC 0x03 #define OSD_OP_SECONDARY_STABILIZE 0x04 #define OSD_OP_SECONDARY_DELETE 0x05 -#define OSD_OP_SECONDARY_LIST 0x10 -#define OSD_OP_MAX 0x10 +#define OSD_OP_TEST_SYNC_STAB_ALL 0x06 +#define OSD_OP_SECONDARY_LIST 0x07 +#define OSD_OP_MAX 0x07 // Alignment & limit for read/write operations #define OSD_RW_ALIGN 512 #define OSD_RW_MAX 64*1024*1024