forked from vitalif/vitastor
Add WRITE_STABLE to the secondary OSD for the upcoming replication support
parent
2e8c69fc5b
commit
3932c9b2e2
6
Makefile
6
Makefile
|
@ -15,7 +15,7 @@ libfio_blockstore.so: ./libblockstore.so fio_engine.o json11.o
|
|||
g++ $(CXXFLAGS) -shared -o $@ fio_engine.o json11.o ./libblockstore.so -ltcmalloc_minimal -luring
|
||||
|
||||
OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_flush.o osd_peering_pg.o \
|
||||
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \
|
||||
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o osd_ops.o pg_states.o \
|
||||
osd_rmw.o json11.o base64.o timerfd_manager.o epoll_manager.o
|
||||
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
|
||||
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring
|
||||
|
@ -37,7 +37,7 @@ libfio_sec_osd.so: fio_sec_osd.o rw_blocking.o
|
|||
g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ fio_sec_osd.o rw_blocking.o
|
||||
|
||||
FIO_CLUSTER_OBJS := cluster_client.o epoll_manager.o etcd_state_client.o \
|
||||
messenger.o msgr_send.o msgr_receive.o ringloop.o json11.o http_client.o pg_states.o timerfd_manager.o base64.o
|
||||
messenger.o msgr_send.o msgr_receive.o ringloop.o json11.o http_client.o osd_ops.o pg_states.o timerfd_manager.o base64.o
|
||||
libfio_cluster.so: fio_cluster.o $(FIO_CLUSTER_OBJS)
|
||||
g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $< $(FIO_CLUSTER_OBJS) -luring
|
||||
|
||||
|
@ -118,6 +118,8 @@ osd_flush.o: osd_flush.cpp blockstore.h cpp-btree/btree_map.h epoll_manager.h et
|
|||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd_main.o: osd_main.cpp blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd_ops.o: osd_ops.cpp object_id.h osd_id.h osd_ops.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd_peering.o: osd_peering.cpp base64.h blockstore.h cpp-btree/btree_map.h epoll_manager.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h
|
||||
g++ $(CXXFLAGS) -c -o $@ $<
|
||||
osd_peering_pg.o: osd_peering_pg.cpp cpp-btree/btree_map.h object_id.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h
|
||||
|
|
|
@ -186,7 +186,8 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
|||
cur_op->buf = memalign(MEM_ALIGNMENT, cur_op->req.sec_rw.len);
|
||||
cl->read_remaining = 0;
|
||||
}
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE)
|
||||
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
|
||||
{
|
||||
if (cur_op->req.sec_rw.len > 0)
|
||||
cur_op->buf = memalign(MEM_ALIGNMENT, cur_op->req.sec_rw.len);
|
||||
|
|
|
@ -77,7 +77,8 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
|
|||
stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.rw.len;
|
||||
}
|
||||
else if (cl.write_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE)
|
||||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
|
||||
{
|
||||
stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.sec_rw.len;
|
||||
}
|
||||
|
@ -95,6 +96,7 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
|
|||
cl.send_list.push_back(cl.write_op->req.buf, OSD_PACKET_SIZE);
|
||||
if (cl.write_op->req.hdr.opcode == OSD_OP_WRITE ||
|
||||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE ||
|
||||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
|
||||
cl.write_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)
|
||||
{
|
||||
|
|
33
osd.cpp
33
osd.cpp
|
@ -6,23 +6,6 @@
|
|||
|
||||
#include "osd.h"
|
||||
|
||||
const char* osd_op_names[] = {
|
||||
"",
|
||||
"read",
|
||||
"write",
|
||||
"sync",
|
||||
"stabilize",
|
||||
"rollback",
|
||||
"delete",
|
||||
"sync_stab_all",
|
||||
"list",
|
||||
"show_config",
|
||||
"primary_read",
|
||||
"primary_write",
|
||||
"primary_sync",
|
||||
"primary_delete",
|
||||
};
|
||||
|
||||
osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop)
|
||||
{
|
||||
this->config = config;
|
||||
|
@ -205,10 +188,18 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
|||
inflight_ops++;
|
||||
if (cur_op->req.hdr.magic != SECONDARY_OSD_OP_MAGIC ||
|
||||
cur_op->req.hdr.opcode < OSD_OP_MIN || cur_op->req.hdr.opcode > OSD_OP_MAX ||
|
||||
(cur_op->req.hdr.opcode == OSD_OP_SEC_READ || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE) &&
|
||||
(cur_op->req.sec_rw.len > OSD_RW_MAX || cur_op->req.sec_rw.len % bs_disk_alignment || cur_op->req.sec_rw.offset % bs_disk_alignment) ||
|
||||
(cur_op->req.hdr.opcode == OSD_OP_READ || cur_op->req.hdr.opcode == OSD_OP_WRITE || cur_op->req.hdr.opcode == OSD_OP_DELETE) &&
|
||||
(cur_op->req.rw.len > OSD_RW_MAX || cur_op->req.rw.len % bs_disk_alignment || cur_op->req.rw.offset % bs_disk_alignment))
|
||||
((cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) &&
|
||||
(cur_op->req.sec_rw.len > OSD_RW_MAX ||
|
||||
cur_op->req.sec_rw.len % bs_disk_alignment ||
|
||||
cur_op->req.sec_rw.offset % bs_disk_alignment)) ||
|
||||
((cur_op->req.hdr.opcode == OSD_OP_READ ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_WRITE ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_DELETE) &&
|
||||
(cur_op->req.rw.len > OSD_RW_MAX ||
|
||||
cur_op->req.rw.len % bs_disk_alignment ||
|
||||
cur_op->req.rw.offset % bs_disk_alignment)))
|
||||
{
|
||||
// Bad command
|
||||
finish_op(cur_op, -EINVAL);
|
||||
|
|
2
osd.h
2
osd.h
|
@ -38,8 +38,6 @@
|
|||
|
||||
//#define OSD_STUB
|
||||
|
||||
extern const char* osd_op_names[];
|
||||
|
||||
struct osd_object_id_t
|
||||
{
|
||||
osd_num_t osd_num;
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
#include "osd_ops.h"
|
||||
|
||||
const char* osd_op_names[] = {
|
||||
"",
|
||||
"read",
|
||||
"write",
|
||||
"write_stable",
|
||||
"sync",
|
||||
"stabilize",
|
||||
"rollback",
|
||||
"delete",
|
||||
"sync_stab_all",
|
||||
"list",
|
||||
"show_config",
|
||||
"primary_read",
|
||||
"primary_write",
|
||||
"primary_sync",
|
||||
"primary_delete",
|
||||
};
|
27
osd_ops.h
27
osd_ops.h
|
@ -12,18 +12,19 @@
|
|||
#define OSD_OP_MIN 1
|
||||
#define OSD_OP_SEC_READ 1
|
||||
#define OSD_OP_SEC_WRITE 2
|
||||
#define OSD_OP_SEC_SYNC 3
|
||||
#define OSD_OP_SEC_STABILIZE 4
|
||||
#define OSD_OP_SEC_ROLLBACK 5
|
||||
#define OSD_OP_SEC_DELETE 6
|
||||
#define OSD_OP_TEST_SYNC_STAB_ALL 7
|
||||
#define OSD_OP_SEC_LIST 8
|
||||
#define OSD_OP_SHOW_CONFIG 9
|
||||
#define OSD_OP_READ 10
|
||||
#define OSD_OP_WRITE 11
|
||||
#define OSD_OP_SYNC 12
|
||||
#define OSD_OP_DELETE 13
|
||||
#define OSD_OP_MAX 13
|
||||
#define OSD_OP_SEC_WRITE_STABLE 3
|
||||
#define OSD_OP_SEC_SYNC 4
|
||||
#define OSD_OP_SEC_STABILIZE 5
|
||||
#define OSD_OP_SEC_ROLLBACK 6
|
||||
#define OSD_OP_SEC_DELETE 7
|
||||
#define OSD_OP_TEST_SYNC_STAB_ALL 8
|
||||
#define OSD_OP_SEC_LIST 9
|
||||
#define OSD_OP_SHOW_CONFIG 10
|
||||
#define OSD_OP_READ 11
|
||||
#define OSD_OP_WRITE 12
|
||||
#define OSD_OP_SYNC 13
|
||||
#define OSD_OP_DELETE 14
|
||||
#define OSD_OP_MAX 14
|
||||
// Alignment & limit for read/write operations
|
||||
#ifndef MEM_ALIGNMENT
|
||||
#define MEM_ALIGNMENT 512
|
||||
|
@ -202,3 +203,5 @@ union osd_any_reply_t
|
|||
osd_reply_sync_t sync;
|
||||
uint8_t buf[OSD_PACKET_SIZE];
|
||||
};
|
||||
|
||||
extern const char* osd_op_names[];
|
||||
|
|
|
@ -5,7 +5,8 @@
|
|||
void osd_t::secondary_op_callback(osd_op_t *op)
|
||||
{
|
||||
if (op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||
op->req.hdr.opcode == OSD_OP_SEC_WRITE)
|
||||
op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||
op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
|
||||
{
|
||||
op->reply.sec_rw.version = op->bs_op->version;
|
||||
}
|
||||
|
@ -40,14 +41,16 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
|
|||
cur_op->bs_op->callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
|
||||
cur_op->bs_op->opcode = (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ? BS_OP_READ
|
||||
: (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ? BS_OP_WRITE
|
||||
: (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE ? BS_OP_WRITE_STABLE
|
||||
: (cur_op->req.hdr.opcode == OSD_OP_SEC_SYNC ? BS_OP_SYNC
|
||||
: (cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ? BS_OP_STABLE
|
||||
: (cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK ? BS_OP_ROLLBACK
|
||||
: (cur_op->req.hdr.opcode == OSD_OP_SEC_DELETE ? BS_OP_DELETE
|
||||
: (cur_op->req.hdr.opcode == OSD_OP_SEC_LIST ? BS_OP_LIST
|
||||
: -1)))))));
|
||||
: -1))))))));
|
||||
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE)
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
|
||||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
|
||||
{
|
||||
cur_op->bs_op->oid = cur_op->req.sec_rw.oid;
|
||||
cur_op->bs_op->version = cur_op->req.sec_rw.version;
|
||||
|
|
|
@ -141,7 +141,7 @@ void run_stub(int peer_fd)
|
|||
if (r < op.sec_rw.len)
|
||||
break;
|
||||
}
|
||||
else if (op.hdr.opcode == OSD_OP_SEC_WRITE)
|
||||
else if (op.hdr.opcode == OSD_OP_SEC_WRITE || op.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
|
||||
{
|
||||
buf = malloc(op.sec_rw.len);
|
||||
r = read_blocking(peer_fd, buf, op.sec_rw.len);
|
||||
|
|
|
@ -111,7 +111,7 @@ void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op)
|
|||
op->buf = malloc(op->req.sec_rw.len);
|
||||
op->iov.push_back(op->buf, op->req.sec_rw.len);
|
||||
}
|
||||
else if (op->req.hdr.opcode == OSD_OP_SEC_WRITE)
|
||||
else if (op->req.hdr.opcode == OSD_OP_SEC_WRITE || op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
|
||||
{
|
||||
op->reply.hdr.retval = op->req.sec_rw.len;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue