Add sync&stabilize test operation
parent
9af000e9de
commit
3061b8cf51
88
osd.cpp
88
osd.cpp
|
@ -307,27 +307,28 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void osd_t::blockstore_op_callback(osd_op_t *cur_op)
|
||||||
|
{
|
||||||
|
auto cl_it = clients.find(cur_op->peer_fd);
|
||||||
|
if (cl_it != clients.end())
|
||||||
|
{
|
||||||
|
auto & cl = cl_it->second;
|
||||||
|
if (cl.write_state == 0)
|
||||||
|
{
|
||||||
|
cl.write_state = CL_WRITE_READY;
|
||||||
|
write_ready_clients.push_back(cur_op->peer_fd);
|
||||||
|
}
|
||||||
|
cl.completions.push_back(cur_op);
|
||||||
|
ringloop->wakeup();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
delete cur_op;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void osd_t::enqueue_op(osd_op_t *cur_op)
|
void osd_t::enqueue_op(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op)
|
|
||||||
{
|
|
||||||
auto cl_it = clients.find(cur_op->peer_fd);
|
|
||||||
if (cl_it != clients.end())
|
|
||||||
{
|
|
||||||
auto & cl = cl_it->second;
|
|
||||||
if (cl.write_state == 0)
|
|
||||||
{
|
|
||||||
cl.write_state = CL_WRITE_READY;
|
|
||||||
write_ready_clients.push_back(cur_op->peer_fd);
|
|
||||||
}
|
|
||||||
cl.completions.push_back(cur_op);
|
|
||||||
ringloop->wakeup();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
delete cur_op;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if (cur_op->op.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
|
if (cur_op->op.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
|
||||||
cur_op->op.hdr.opcode < OSD_OP_MIN || cur_op->op.hdr.opcode > OSD_OP_MAX ||
|
cur_op->op.hdr.opcode < OSD_OP_MIN || cur_op->op.hdr.opcode > OSD_OP_MAX ||
|
||||||
(cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
|
(cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ || cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) &&
|
||||||
|
@ -335,10 +336,57 @@ void osd_t::enqueue_op(osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
// Bad command
|
// Bad command
|
||||||
cur_op->bs_op.retval = -EINVAL;
|
cur_op->bs_op.retval = -EINVAL;
|
||||||
cur_op->bs_op.callback(&cur_op->bs_op);
|
blockstore_op_callback(cur_op);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (cur_op->op.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL)
|
||||||
|
{
|
||||||
|
// Sync and stabilize all objects
|
||||||
|
// This command is only valid for tests
|
||||||
|
// FIXME: Dedup between here & fio_engine
|
||||||
|
if (!allow_test_ops)
|
||||||
|
{
|
||||||
|
cur_op->bs_op.retval = -EINVAL;
|
||||||
|
blockstore_op_callback(cur_op);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cur_op->bs_op.flags = OP_SYNC;
|
||||||
|
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t *op)
|
||||||
|
{
|
||||||
|
auto & unstable_writes = bs->get_unstable_writes();
|
||||||
|
if (op->retval >= 0 && unstable_writes.size() > 0)
|
||||||
|
{
|
||||||
|
op->flags = OP_STABLE;
|
||||||
|
op->len = unstable_writes.size();
|
||||||
|
obj_ver_id *vers = new obj_ver_id[op->len];
|
||||||
|
op->buf = vers;
|
||||||
|
int i = 0;
|
||||||
|
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++)
|
||||||
|
{
|
||||||
|
vers[i] = {
|
||||||
|
.oid = it->first,
|
||||||
|
.version = it->second,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
unstable_writes.clear();
|
||||||
|
op->callback = [this, cur_op](blockstore_op_t *op)
|
||||||
|
{
|
||||||
|
blockstore_op_callback(cur_op);
|
||||||
|
obj_ver_id *vers = (obj_ver_id*)op->buf;
|
||||||
|
delete[] vers;
|
||||||
|
};
|
||||||
|
bs->enqueue_op(op);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
blockstore_op_callback(cur_op);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
bs->enqueue_op(&cur_op->bs_op);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// FIXME: LIST is not a blockstore op yet
|
// FIXME: LIST is not a blockstore op yet
|
||||||
|
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { blockstore_op_callback(cur_op); };
|
||||||
cur_op->bs_op.flags = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ
|
cur_op->bs_op.flags = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ
|
||||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE
|
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE
|
||||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC
|
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC
|
||||||
|
|
2
osd.h
2
osd.h
|
@ -77,6 +77,7 @@ class osd_t
|
||||||
std::string bind_address;
|
std::string bind_address;
|
||||||
int bind_port, listen_backlog;
|
int bind_port, listen_backlog;
|
||||||
int client_queue_depth = 128;
|
int client_queue_depth = 128;
|
||||||
|
bool allow_test_ops = true;
|
||||||
|
|
||||||
// fields
|
// fields
|
||||||
|
|
||||||
|
@ -101,6 +102,7 @@ class osd_t
|
||||||
void send_replies();
|
void send_replies();
|
||||||
void make_reply(osd_op_t *op);
|
void make_reply(osd_op_t *op);
|
||||||
void handle_send(ring_data_t *data, int peer_fd);
|
void handle_send(ring_data_t *data, int peer_fd);
|
||||||
|
void blockstore_op_callback(osd_op_t *cur_op);
|
||||||
public:
|
public:
|
||||||
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
||||||
~osd_t();
|
~osd_t();
|
||||||
|
|
|
@ -4,7 +4,6 @@
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
// Magic numbers
|
// Magic numbers
|
||||||
|
|
||||||
#define SECONDARY_OSD_OP_MAGIC 0x2bd7b10325434553l
|
#define SECONDARY_OSD_OP_MAGIC 0x2bd7b10325434553l
|
||||||
#define SECONDARY_OSD_REPLY_MAGIC 0xbaa699b87b434553l
|
#define SECONDARY_OSD_REPLY_MAGIC 0xbaa699b87b434553l
|
||||||
// Operation request headers and operation reply headers have fixed size after which comes data
|
// Operation request headers and operation reply headers have fixed size after which comes data
|
||||||
|
@ -17,8 +16,9 @@
|
||||||
#define OSD_OP_SECONDARY_SYNC 0x03
|
#define OSD_OP_SECONDARY_SYNC 0x03
|
||||||
#define OSD_OP_SECONDARY_STABILIZE 0x04
|
#define OSD_OP_SECONDARY_STABILIZE 0x04
|
||||||
#define OSD_OP_SECONDARY_DELETE 0x05
|
#define OSD_OP_SECONDARY_DELETE 0x05
|
||||||
#define OSD_OP_SECONDARY_LIST 0x10
|
#define OSD_OP_TEST_SYNC_STAB_ALL 0x06
|
||||||
#define OSD_OP_MAX 0x10
|
#define OSD_OP_SECONDARY_LIST 0x07
|
||||||
|
#define OSD_OP_MAX 0x07
|
||||||
// Alignment & limit for read/write operations
|
// Alignment & limit for read/write operations
|
||||||
#define OSD_RW_ALIGN 512
|
#define OSD_RW_ALIGN 512
|
||||||
#define OSD_RW_MAX 64*1024*1024
|
#define OSD_RW_MAX 64*1024*1024
|
||||||
|
|
Loading…
Reference in New Issue