forked from vitalif/vitastor
Rename OP_ to BS_OP_
parent
19abe6227e
commit
d3d21e6e0f
16
blockstore.h
16
blockstore.h
|
@ -19,19 +19,19 @@
|
|||
#define MAX_BLOCK_SIZE 128*1024*1024
|
||||
#define DISK_ALIGNMENT 512
|
||||
|
||||
#define OP_READ 1
|
||||
#define OP_WRITE 2
|
||||
#define OP_SYNC 3
|
||||
#define OP_STABLE 4
|
||||
#define OP_DELETE 5
|
||||
#define OP_TYPE_MASK 0x7
|
||||
#define BS_OP_READ 1
|
||||
#define BS_OP_WRITE 2
|
||||
#define BS_OP_SYNC 3
|
||||
#define BS_OP_STABLE 4
|
||||
#define BS_OP_DELETE 5
|
||||
#define BS_OP_TYPE_MASK 0x7
|
||||
|
||||
#define BS_OP_PRIVATE_DATA_SIZE 256
|
||||
|
||||
struct blockstore_op_t
|
||||
{
|
||||
// flags contain operation type and possibly other flags
|
||||
uint64_t flags;
|
||||
// operation
|
||||
uint64_t opcode;
|
||||
// finish callback
|
||||
std::function<void (blockstore_op_t*)> callback;
|
||||
// For reads, writes & deletes: oid is the requested object
|
||||
|
|
|
@ -123,8 +123,8 @@ void blockstore_impl_t::loop()
|
|||
}
|
||||
else if (PRIV(op)->wait_for)
|
||||
{
|
||||
if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
|
||||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
|
||||
if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE ||
|
||||
(op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE)
|
||||
{
|
||||
has_writes = 2;
|
||||
}
|
||||
|
@ -134,12 +134,12 @@ void blockstore_impl_t::loop()
|
|||
unsigned ring_space = ringloop->space_left();
|
||||
unsigned prev_sqe_pos = ringloop->save();
|
||||
int dequeue_op = 0;
|
||||
if ((op->flags & OP_TYPE_MASK) == OP_READ)
|
||||
if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_READ)
|
||||
{
|
||||
dequeue_op = dequeue_read(op);
|
||||
}
|
||||
else if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
|
||||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
|
||||
else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE ||
|
||||
(op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE)
|
||||
{
|
||||
if (has_writes == 2)
|
||||
{
|
||||
|
@ -149,7 +149,7 @@ void blockstore_impl_t::loop()
|
|||
dequeue_op = dequeue_write(op);
|
||||
has_writes = dequeue_op ? 1 : 2;
|
||||
}
|
||||
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
|
||||
else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_SYNC)
|
||||
{
|
||||
// wait for all small writes to be submitted
|
||||
// wait for all big writes to complete, submit data device fsync
|
||||
|
@ -162,7 +162,7 @@ void blockstore_impl_t::loop()
|
|||
}
|
||||
dequeue_op = dequeue_sync(op);
|
||||
}
|
||||
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
|
||||
else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_STABLE)
|
||||
{
|
||||
dequeue_op = dequeue_stable(op);
|
||||
}
|
||||
|
@ -207,7 +207,7 @@ bool blockstore_impl_t::is_safe_to_stop()
|
|||
{
|
||||
// We should sync the blockstore before unmounting
|
||||
blockstore_op_t *op = new blockstore_op_t;
|
||||
op->flags = OP_SYNC;
|
||||
op->opcode = BS_OP_SYNC;
|
||||
op->buf = NULL;
|
||||
op->callback = [](blockstore_op_t *op)
|
||||
{
|
||||
|
@ -279,10 +279,10 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
|
|||
|
||||
void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
|
||||
{
|
||||
int type = op->flags & OP_TYPE_MASK;
|
||||
if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) &&
|
||||
int type = op->opcode & BS_OP_TYPE_MASK;
|
||||
if (type < BS_OP_READ || type > BS_OP_DELETE || (type == BS_OP_READ || type == BS_OP_WRITE) &&
|
||||
(op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)) ||
|
||||
readonly && type != OP_READ)
|
||||
readonly && type != BS_OP_READ)
|
||||
{
|
||||
// Basic verification not passed
|
||||
op->retval = -EINVAL;
|
||||
|
@ -295,7 +295,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
|
|||
PRIV(op)->sync_state = 0;
|
||||
PRIV(op)->pending_ops = 0;
|
||||
submit_queue.push_back(op);
|
||||
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
|
||||
if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE)
|
||||
{
|
||||
enqueue_write(op);
|
||||
}
|
||||
|
|
|
@ -2,8 +2,8 @@
|
|||
|
||||
void blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
||||
{
|
||||
// Assign version number
|
||||
bool found = false, deleted = false, is_del = (op->flags & OP_TYPE_MASK) == OP_DELETE;
|
||||
// Check or assign version number
|
||||
bool found = false, deleted = false, is_del = (op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE;
|
||||
if (dirty_db.size() > 0)
|
||||
{
|
||||
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
|
||||
|
|
|
@ -192,12 +192,13 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
|
|||
switch (io->ddir)
|
||||
{
|
||||
case DDIR_READ:
|
||||
op->flags = OP_READ;
|
||||
op->opcode = BS_OP_READ;
|
||||
op->buf = io->xfer_buf;
|
||||
op->oid = {
|
||||
.inode = 1,
|
||||
.stripe = io->offset >> bsd->bs->get_block_order(),
|
||||
};
|
||||
op->version = UINT64_MAX; // last unstable
|
||||
op->offset = io->offset % bsd->bs->get_block_size();
|
||||
op->len = io->xfer_buflen;
|
||||
op->callback = [io, n](blockstore_op_t *op)
|
||||
|
@ -213,12 +214,13 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
|
|||
};
|
||||
break;
|
||||
case DDIR_WRITE:
|
||||
op->flags = OP_WRITE;
|
||||
op->opcode = BS_OP_WRITE;
|
||||
op->buf = io->xfer_buf;
|
||||
op->oid = {
|
||||
.inode = 1,
|
||||
.stripe = io->offset >> bsd->bs->get_block_order(),
|
||||
};
|
||||
op->version = 0; // assign automatically
|
||||
op->offset = io->offset % bsd->bs->get_block_size();
|
||||
op->len = io->xfer_buflen;
|
||||
op->callback = [io, n](blockstore_op_t *op)
|
||||
|
@ -234,14 +236,14 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
|
|||
};
|
||||
break;
|
||||
case DDIR_SYNC:
|
||||
op->flags = OP_SYNC;
|
||||
op->opcode = BS_OP_SYNC;
|
||||
op->callback = [io, n](blockstore_op_t *op)
|
||||
{
|
||||
bs_data *bsd = (bs_data*)io->engine_data;
|
||||
auto & unstable_writes = bsd->bs->get_unstable_writes();
|
||||
if (op->retval >= 0 && unstable_writes.size() > 0)
|
||||
{
|
||||
op->flags = OP_STABLE;
|
||||
op->opcode = BS_OP_STABLE;
|
||||
op->len = unstable_writes.size();
|
||||
obj_ver_id *vers = new obj_ver_id[op->len];
|
||||
op->buf = vers;
|
||||
|
@ -287,7 +289,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
|
|||
}
|
||||
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("+++ %s %llx n=%d\n", op->flags == OP_READ ? "OP_READ" : (op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC"), io, n);
|
||||
printf("+++ %s %llx n=%d\n", op->opcode == OP_READ ? "OP_READ" : (op->opcode == OP_WRITE ? "OP_WRITE" : "OP_SYNC"), io, n);
|
||||
#endif
|
||||
io->error = 0;
|
||||
bsd->inflight++;
|
||||
|
|
26
osd.cpp
26
osd.cpp
|
@ -312,7 +312,7 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
|||
}
|
||||
}
|
||||
|
||||
void osd_t::blockstore_op_callback(osd_op_t *cur_op)
|
||||
void osd_t::secondary_op_callback(osd_op_t *cur_op)
|
||||
{
|
||||
auto cl_it = clients.find(cur_op->peer_fd);
|
||||
if (cl_it != clients.end())
|
||||
|
@ -341,7 +341,7 @@ void osd_t::enqueue_op(osd_op_t *cur_op)
|
|||
{
|
||||
// Bad command
|
||||
cur_op->bs_op.retval = -EINVAL;
|
||||
blockstore_op_callback(cur_op);
|
||||
secondary_op_callback(cur_op);
|
||||
return;
|
||||
}
|
||||
if (cur_op->op.hdr.opcode == OSD_OP_TEST_SYNC_STAB_ALL)
|
||||
|
@ -352,16 +352,16 @@ void osd_t::enqueue_op(osd_op_t *cur_op)
|
|||
if (!allow_test_ops)
|
||||
{
|
||||
cur_op->bs_op.retval = -EINVAL;
|
||||
blockstore_op_callback(cur_op);
|
||||
secondary_op_callback(cur_op);
|
||||
return;
|
||||
}
|
||||
cur_op->bs_op.flags = OP_SYNC;
|
||||
cur_op->bs_op.opcode = BS_OP_SYNC;
|
||||
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t *op)
|
||||
{
|
||||
auto & unstable_writes = bs->get_unstable_writes();
|
||||
if (op->retval >= 0 && unstable_writes.size() > 0)
|
||||
{
|
||||
op->flags = OP_STABLE;
|
||||
op->opcode = BS_OP_STABLE;
|
||||
op->len = unstable_writes.size();
|
||||
obj_ver_id *vers = new obj_ver_id[op->len];
|
||||
op->buf = vers;
|
||||
|
@ -376,7 +376,7 @@ void osd_t::enqueue_op(osd_op_t *cur_op)
|
|||
unstable_writes.clear();
|
||||
op->callback = [this, cur_op](blockstore_op_t *op)
|
||||
{
|
||||
blockstore_op_callback(cur_op);
|
||||
secondary_op_callback(cur_op);
|
||||
obj_ver_id *vers = (obj_ver_id*)op->buf;
|
||||
delete[] vers;
|
||||
};
|
||||
|
@ -384,19 +384,19 @@ void osd_t::enqueue_op(osd_op_t *cur_op)
|
|||
}
|
||||
else
|
||||
{
|
||||
blockstore_op_callback(cur_op);
|
||||
secondary_op_callback(cur_op);
|
||||
}
|
||||
};
|
||||
bs->enqueue_op(&cur_op->bs_op);
|
||||
return;
|
||||
}
|
||||
// FIXME: LIST is not a blockstore op yet
|
||||
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { blockstore_op_callback(cur_op); };
|
||||
cur_op->bs_op.flags = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? OP_READ
|
||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? OP_WRITE
|
||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? OP_SYNC
|
||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? OP_STABLE
|
||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE ? OP_DELETE
|
||||
cur_op->bs_op.callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
|
||||
cur_op->bs_op.opcode = (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ? BS_OP_READ
|
||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE ? BS_OP_WRITE
|
||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_SYNC ? BS_OP_SYNC
|
||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ? BS_OP_STABLE
|
||||
: (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE ? BS_OP_DELETE
|
||||
: -1)))));
|
||||
if (cur_op->op.hdr.opcode == OSD_OP_SECONDARY_READ ||
|
||||
cur_op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE)
|
||||
|
|
2
osd.h
2
osd.h
|
@ -103,7 +103,7 @@ class osd_t
|
|||
void send_replies();
|
||||
void make_reply(osd_op_t *op);
|
||||
void handle_send(ring_data_t *data, int peer_fd);
|
||||
void blockstore_op_callback(osd_op_t *cur_op);
|
||||
void secondary_op_callback(osd_op_t *cur_op);
|
||||
public:
|
||||
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
||||
~osd_t();
|
||||
|
|
|
@ -36,7 +36,7 @@ int main(int narg, char *args[])
|
|||
if (bs->is_started())
|
||||
{
|
||||
printf("init completed\n");
|
||||
op.flags = OP_WRITE;
|
||||
op.opcode = BS_OP_WRITE;
|
||||
op.oid = { .inode = 1, .stripe = 0 };
|
||||
op.version = 0;
|
||||
op.offset = 4096;
|
||||
|
@ -51,14 +51,14 @@ int main(int narg, char *args[])
|
|||
{
|
||||
printf("version %lu written, syncing\n", op.version);
|
||||
version = op.version;
|
||||
op.flags = OP_SYNC;
|
||||
op.opcode = BS_OP_SYNC;
|
||||
bs->enqueue_op(&op);
|
||||
main_state = 3;
|
||||
}
|
||||
else if (main_state == 4)
|
||||
{
|
||||
printf("stabilizing version %lu\n", version);
|
||||
op.flags = OP_STABLE;
|
||||
op.opcode = BS_OP_STABLE;
|
||||
op.len = 1;
|
||||
*((obj_ver_id*)op.buf) = {
|
||||
.oid = { .inode = 1, .stripe = 0 },
|
||||
|
|
Loading…
Reference in New Issue