forked from vitalif/vitastor
Implement event-driven PG peering
parent
327f310868
commit
a66b34e04d
2
Makefile
2
Makefile
|
@ -24,7 +24,7 @@ libblockstore.so: $(BLOCKSTORE_OBJS)
|
||||||
libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o
|
libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o
|
||||||
g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring
|
g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring
|
||||||
|
|
||||||
OSD_OBJS := osd.o osd_exec_secondary.o osd_receive.o osd_send.o osd_peering.o osd_peering_pg.o osd_primary.o json11.o
|
OSD_OBJS := osd.o osd_exec_secondary.o osd_receive.o osd_send.o osd_peering.o osd_peering_pg.o osd_primary.o json11.o timerfd_interval.o
|
||||||
osd_exec_secondary.o: osd_exec_secondary.cpp osd.h osd_ops.h
|
osd_exec_secondary.o: osd_exec_secondary.cpp osd.h osd_ops.h
|
||||||
g++ $(CXXFLAGS) -c -o $@ $<
|
g++ $(CXXFLAGS) -c -o $@ $<
|
||||||
osd_receive.o: osd_receive.cpp osd.h osd_ops.h
|
osd_receive.o: osd_receive.cpp osd.h osd_ops.h
|
||||||
|
|
5
osd.cpp
5
osd.cpp
|
@ -12,6 +12,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||||
this->config = config;
|
this->config = config;
|
||||||
this->bs = bs;
|
this->bs = bs;
|
||||||
this->ringloop = ringloop;
|
this->ringloop = ringloop;
|
||||||
|
this->tick_tfd = new timerfd_interval(ringloop, 1, []() {});
|
||||||
this->bs_block_size = bs->get_block_size();
|
this->bs_block_size = bs->get_block_size();
|
||||||
// FIXME: use bitmap granularity instead
|
// FIXME: use bitmap granularity instead
|
||||||
this->bs_disk_alignment = bs->get_disk_alignment();
|
this->bs_disk_alignment = bs->get_disk_alignment();
|
||||||
|
@ -85,6 +86,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
|
||||||
|
|
||||||
osd_t::~osd_t()
|
osd_t::~osd_t()
|
||||||
{
|
{
|
||||||
|
delete tick_tfd;
|
||||||
ringloop->unregister_consumer(consumer);
|
ringloop->unregister_consumer(consumer);
|
||||||
close(epoll_fd);
|
close(epoll_fd);
|
||||||
close(listen_fd);
|
close(listen_fd);
|
||||||
|
@ -229,7 +231,10 @@ void osd_t::stop_client(int peer_fd)
|
||||||
auto it = clients.find(peer_fd);
|
auto it = clients.find(peer_fd);
|
||||||
if (it->second.osd_num)
|
if (it->second.osd_num)
|
||||||
{
|
{
|
||||||
|
// FIXME cancel outbound operations
|
||||||
osd_peer_fds.erase(it->second.osd_num);
|
osd_peer_fds.erase(it->second.osd_num);
|
||||||
|
repeer_pgs(it->second.osd_num, false);
|
||||||
|
peering_state |= OSD_PEERING_PEERS;
|
||||||
}
|
}
|
||||||
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
|
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
|
||||||
{
|
{
|
||||||
|
|
9
osd.h
9
osd.h
|
@ -14,6 +14,7 @@
|
||||||
|
|
||||||
#include "blockstore.h"
|
#include "blockstore.h"
|
||||||
#include "ringloop.h"
|
#include "ringloop.h"
|
||||||
|
#include "timerfd_interval.h"
|
||||||
#include "osd_ops.h"
|
#include "osd_ops.h"
|
||||||
#include "osd_peering_pg.h"
|
#include "osd_peering_pg.h"
|
||||||
|
|
||||||
|
@ -34,6 +35,8 @@
|
||||||
|
|
||||||
#define PEER_CONNECTING 1
|
#define PEER_CONNECTING 1
|
||||||
#define PEER_CONNECTED 2
|
#define PEER_CONNECTED 2
|
||||||
|
#define OSD_PEERING_PEERS 1
|
||||||
|
#define OSD_PEERING_PGS 2
|
||||||
|
|
||||||
//#define OSD_STUB
|
//#define OSD_STUB
|
||||||
|
|
||||||
|
@ -186,6 +189,7 @@ class osd_t
|
||||||
blockstore_t *bs;
|
blockstore_t *bs;
|
||||||
uint32_t bs_block_size, bs_disk_alignment;
|
uint32_t bs_block_size, bs_disk_alignment;
|
||||||
ring_loop_t *ringloop;
|
ring_loop_t *ringloop;
|
||||||
|
timerfd_interval *tick_tfd;
|
||||||
|
|
||||||
int wait_state = 0;
|
int wait_state = 0;
|
||||||
int epoll_fd = 0;
|
int epoll_fd = 0;
|
||||||
|
@ -217,6 +221,7 @@ class osd_t
|
||||||
osd_peer_def_t parse_peer(std::string peer);
|
osd_peer_def_t parse_peer(std::string peer);
|
||||||
void init_primary();
|
void init_primary();
|
||||||
void handle_peers();
|
void handle_peers();
|
||||||
|
void repeer_pgs(osd_num_t osd_num, bool is_connected);
|
||||||
void start_pg_peering(int i);
|
void start_pg_peering(int i);
|
||||||
|
|
||||||
// op execution
|
// op execution
|
||||||
|
@ -235,8 +240,8 @@ class osd_t
|
||||||
void make_primary_reply(osd_op_t *op);
|
void make_primary_reply(osd_op_t *op);
|
||||||
void finish_primary_op(osd_op_t *cur_op, int retval);
|
void finish_primary_op(osd_op_t *cur_op, int retval);
|
||||||
void handle_primary_read_subop(osd_op_t *cur_op, int ok);
|
void handle_primary_read_subop(osd_op_t *cur_op, int ok);
|
||||||
int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_set, int minsize, int size);
|
int extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size);
|
||||||
void submit_read_subops(int read_pg_size, const uint64_t* target_set, osd_op_t *cur_op);
|
void submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
|
||||||
public:
|
public:
|
||||||
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop);
|
||||||
~osd_t();
|
~osd_t();
|
||||||
|
|
212
osd_peering.cpp
212
osd_peering.cpp
|
@ -17,12 +17,13 @@ void osd_t::init_primary()
|
||||||
throw std::runtime_error("peer1 and peer2 osd numbers are the same");
|
throw std::runtime_error("peer1 and peer2 osd numbers are the same");
|
||||||
pgs.push_back((pg_t){
|
pgs.push_back((pg_t){
|
||||||
.state = PG_OFFLINE,
|
.state = PG_OFFLINE,
|
||||||
.pg_cursize = 2, // or 3
|
.pg_cursize = 0,
|
||||||
.pg_num = 1,
|
.pg_num = 1,
|
||||||
.target_set = { 1, 0, 3 }, // or { 1, 2, 3 }
|
.target_set = { 1, 2, 3 },
|
||||||
|
.cur_set = { 1, 0, 0 },
|
||||||
});
|
});
|
||||||
pg_count = 1;
|
pg_count = 1;
|
||||||
peering_state = 1;
|
peering_state = OSD_PEERING_PEERS;
|
||||||
}
|
}
|
||||||
|
|
||||||
osd_peer_def_t osd_t::parse_peer(std::string peer)
|
osd_peer_def_t osd_t::parse_peer(std::string peer)
|
||||||
|
@ -122,19 +123,19 @@ void osd_t::handle_connect_result(int peer_fd)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peering loop
|
// Peering loop
|
||||||
// Ideally: Connect -> Ask & check config -> Start PG peering
|
|
||||||
void osd_t::handle_peers()
|
void osd_t::handle_peers()
|
||||||
{
|
{
|
||||||
if (peering_state & 1)
|
if (peering_state & OSD_PEERING_PEERS)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < peers.size(); i++)
|
for (int i = 0; i < peers.size(); i++)
|
||||||
{
|
{
|
||||||
if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end() &&
|
if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end() &&
|
||||||
time(NULL) - peers[i].last_connect_attempt > 5)
|
time(NULL) - peers[i].last_connect_attempt > 5) // FIXME hardcode 5
|
||||||
{
|
{
|
||||||
peers[i].last_connect_attempt = time(NULL);
|
peers[i].last_connect_attempt = time(NULL);
|
||||||
connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](osd_num_t osd_num, int peer_fd)
|
connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](osd_num_t osd_num, int peer_fd)
|
||||||
{
|
{
|
||||||
|
// FIXME: Check peer config after connecting
|
||||||
if (peer_fd < 0)
|
if (peer_fd < 0)
|
||||||
{
|
{
|
||||||
printf("Failed to connect to peer OSD %lu: %s\n", osd_num, strerror(-peer_fd));
|
printf("Failed to connect to peer OSD %lu: %s\n", osd_num, strerror(-peer_fd));
|
||||||
|
@ -144,83 +145,202 @@ void osd_t::handle_peers()
|
||||||
int i;
|
int i;
|
||||||
for (i = 0; i < peers.size(); i++)
|
for (i = 0; i < peers.size(); i++)
|
||||||
{
|
{
|
||||||
auto it = osd_peer_fds.find(peers[i].osd_num);
|
if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end())
|
||||||
if (it == osd_peer_fds.end() || clients[it->second].peer_state != PEER_CONNECTED)
|
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (i >= peers.size())
|
if (i >= peers.size())
|
||||||
{
|
{
|
||||||
// Start PG peering
|
// Connected to all peers
|
||||||
pgs[0].state = PG_PEERING;
|
peering_state = peering_state & ~OSD_PEERING_PEERS;
|
||||||
pgs[0].state_dict.clear();
|
|
||||||
pgs[0].obj_states.clear();
|
|
||||||
pgs[0].ver_override.clear();
|
|
||||||
if (pgs[0].peering_state)
|
|
||||||
delete pgs[0].peering_state;
|
|
||||||
peering_state = 2;
|
|
||||||
ringloop->wakeup();
|
|
||||||
}
|
}
|
||||||
|
repeer_pgs(osd_num, true);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (peering_state & 2)
|
if (peering_state & OSD_PEERING_PGS)
|
||||||
{
|
{
|
||||||
|
bool still_doing_pgs = false;
|
||||||
for (int i = 0; i < pgs.size(); i++)
|
for (int i = 0; i < pgs.size(); i++)
|
||||||
{
|
{
|
||||||
if (pgs[i].state == PG_PEERING)
|
if (pgs[i].state == PG_PEERING)
|
||||||
{
|
{
|
||||||
if (!pgs[i].peering_state)
|
if (!pgs[i].peering_state->list_ops.size())
|
||||||
{
|
|
||||||
start_pg_peering(i);
|
|
||||||
}
|
|
||||||
else if (pgs[i].peering_state->list_done >= 3)
|
|
||||||
{
|
{
|
||||||
pgs[i].calc_object_states();
|
pgs[i].calc_object_states();
|
||||||
peering_state = 0;
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
still_doing_pgs = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!still_doing_pgs)
|
||||||
|
{
|
||||||
|
// Done all PGs
|
||||||
|
peering_state = peering_state & ~OSD_PEERING_PGS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected)
|
||||||
|
{
|
||||||
|
// Re-peer affected PGs
|
||||||
|
// FIXME: We shouldn't rely just on target_set. Other OSDs may also contain PG data.
|
||||||
|
osd_num_t real_osd = (is_connected ? osd_num : 0);
|
||||||
|
for (int i = 0; i < pgs.size(); i++)
|
||||||
|
{
|
||||||
|
bool repeer = false;
|
||||||
|
for (int r = 0; r < pgs[i].target_set.size(); r++)
|
||||||
|
{
|
||||||
|
if (pgs[i].target_set[r] == osd_num &&
|
||||||
|
pgs[i].cur_set[r] != real_osd)
|
||||||
|
{
|
||||||
|
pgs[i].cur_set[r] = real_osd;
|
||||||
|
repeer = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (repeer)
|
||||||
|
{
|
||||||
|
// Repeer this pg
|
||||||
|
printf("Repeer PG %d because of OSD %lu\n", i, osd_num);
|
||||||
|
start_pg_peering(i);
|
||||||
|
peering_state |= OSD_PEERING_PGS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Repeer on each connect/disconnect peer event
|
||||||
void osd_t::start_pg_peering(int pg_idx)
|
void osd_t::start_pg_peering(int pg_idx)
|
||||||
{
|
{
|
||||||
// FIXME: Set PG_INCOMPLETE if incomplete
|
|
||||||
auto & pg = pgs[pg_idx];
|
auto & pg = pgs[pg_idx];
|
||||||
auto ps = pg.peering_state = new pg_peering_state_t();
|
pg.state = PG_PEERING;
|
||||||
|
pg.state_dict.clear();
|
||||||
|
pg.obj_states.clear();
|
||||||
|
pg.ver_override.clear();
|
||||||
|
pg.pg_cursize = 0;
|
||||||
|
for (int role = 0; role < pg.cur_set.size(); role++)
|
||||||
{
|
{
|
||||||
osd_num_t osd_num = this->osd_num;
|
if (pg.cur_set[role] != 0)
|
||||||
|
{
|
||||||
|
pg.pg_cursize++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pg.pg_cursize < pg.pg_minsize)
|
||||||
|
{
|
||||||
|
pg.state = PG_INCOMPLETE;
|
||||||
|
}
|
||||||
|
if (pg.peering_state)
|
||||||
|
{
|
||||||
|
// Adjust the peering operation that's still in progress
|
||||||
|
for (auto & p: pg.peering_state->list_ops)
|
||||||
|
{
|
||||||
|
int role;
|
||||||
|
for (role = 0; role < pg.cur_set.size(); role++)
|
||||||
|
{
|
||||||
|
if (pg.cur_set[role] == p.first)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size())
|
||||||
|
{
|
||||||
|
// Discard the result after completion, which, chances are, will be unsuccessful
|
||||||
|
auto list_op = p.second;
|
||||||
|
if (list_op->peer_fd == 0)
|
||||||
|
{
|
||||||
|
// Self
|
||||||
|
list_op->bs_op.callback = [list_op](blockstore_op_t *bs_op)
|
||||||
|
{
|
||||||
|
if (list_op->bs_op.buf)
|
||||||
|
free(list_op->bs_op.buf);
|
||||||
|
delete list_op;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Peer
|
||||||
|
list_op->callback = [](osd_op_t *list_op)
|
||||||
|
{
|
||||||
|
delete list_op;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
pg.peering_state->list_ops.erase(p.first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (auto & p: pg.peering_state->list_results)
|
||||||
|
{
|
||||||
|
int role;
|
||||||
|
for (role = 0; role < pg.cur_set.size(); role++)
|
||||||
|
{
|
||||||
|
if (pg.cur_set[role] == p.first)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size())
|
||||||
|
{
|
||||||
|
pg.peering_state->list_results.erase(p.first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pg.state == PG_INCOMPLETE)
|
||||||
|
{
|
||||||
|
if (pg.peering_state)
|
||||||
|
{
|
||||||
|
delete pg.peering_state;
|
||||||
|
pg.peering_state = NULL;
|
||||||
|
}
|
||||||
|
printf("PG %d is incomplete\n", pg.pg_num);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!pg.peering_state)
|
||||||
|
{
|
||||||
|
pg.peering_state = new pg_peering_state_t();
|
||||||
|
}
|
||||||
|
auto ps = pg.peering_state;
|
||||||
|
for (int role = 0; role < pg.cur_set.size(); role++)
|
||||||
|
{
|
||||||
|
osd_num_t role_osd = pg.cur_set[role];
|
||||||
|
if (!role_osd)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (ps->list_ops.find(role_osd) != ps->list_ops.end() ||
|
||||||
|
ps->list_results.find(role_osd) != ps->list_results.end())
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (role_osd == this->osd_num)
|
||||||
|
{
|
||||||
|
// Self
|
||||||
osd_op_t *op = new osd_op_t();
|
osd_op_t *op = new osd_op_t();
|
||||||
op->op_type = 0;
|
op->op_type = 0;
|
||||||
op->peer_fd = 0;
|
op->peer_fd = 0;
|
||||||
op->bs_op.opcode = BS_OP_LIST;
|
op->bs_op.opcode = BS_OP_LIST;
|
||||||
op->bs_op.callback = [ps, op, osd_num](blockstore_op_t *bs_op)
|
op->bs_op.callback = [ps, op, role_osd](blockstore_op_t *bs_op)
|
||||||
{
|
{
|
||||||
if (op->bs_op.retval < 0)
|
if (op->bs_op.retval < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error("OP_LIST failed");
|
throw std::runtime_error("local OP_LIST failed");
|
||||||
}
|
}
|
||||||
printf(
|
printf(
|
||||||
"Got object list from OSD %lu (local): %d objects (%lu of them stable)\n",
|
"Got object list from OSD %lu (local): %d objects (%lu of them stable)\n",
|
||||||
osd_num, bs_op->retval, bs_op->version
|
role_osd, bs_op->retval, bs_op->version
|
||||||
);
|
);
|
||||||
ps->list_results[osd_num] = {
|
ps->list_results[role_osd] = {
|
||||||
.buf = (obj_ver_id*)op->bs_op.buf,
|
.buf = (obj_ver_id*)op->bs_op.buf,
|
||||||
.total_count = (uint64_t)op->bs_op.retval,
|
.total_count = (uint64_t)op->bs_op.retval,
|
||||||
.stable_count = op->bs_op.version,
|
.stable_count = op->bs_op.version,
|
||||||
};
|
};
|
||||||
ps->list_done++;
|
ps->list_done++;
|
||||||
|
ps->list_ops.erase(role_osd);
|
||||||
delete op;
|
delete op;
|
||||||
};
|
};
|
||||||
bs->enqueue_op(&op->bs_op);
|
bs->enqueue_op(&op->bs_op);
|
||||||
|
ps->list_ops[role_osd] = op;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < peers.size(); i++)
|
else
|
||||||
{
|
{
|
||||||
osd_num_t osd_num = peers[i].osd_num;
|
// Peer
|
||||||
auto & cl = clients[osd_peer_fds[peers[i].osd_num]];
|
auto & cl = clients[osd_peer_fds[role_osd]];
|
||||||
osd_op_t *op = new osd_op_t();
|
osd_op_t *op = new osd_op_t();
|
||||||
op->op_type = OSD_OP_OUT;
|
op->op_type = OSD_OP_OUT;
|
||||||
op->peer_fd = cl.peer_fd;
|
op->peer_fd = cl.peer_fd;
|
||||||
|
@ -228,32 +348,42 @@ void osd_t::start_pg_peering(int pg_idx)
|
||||||
.sec_list = {
|
.sec_list = {
|
||||||
.header = {
|
.header = {
|
||||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||||
.id = 1,
|
.id = this->next_subop_id++,
|
||||||
.opcode = OSD_OP_SECONDARY_LIST,
|
.opcode = OSD_OP_SECONDARY_LIST,
|
||||||
},
|
},
|
||||||
.pgnum = pg.pg_num,
|
.pgnum = pg.pg_num,
|
||||||
.pgtotal = pg_count,
|
.pgtotal = pg_count,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
op->callback = [ps, osd_num](osd_op_t *op)
|
op->callback = [this, ps, role_osd](osd_op_t *op)
|
||||||
{
|
{
|
||||||
if (op->reply.hdr.retval < 0)
|
if (op->reply.hdr.retval < 0)
|
||||||
{
|
{
|
||||||
throw std::runtime_error("OP_LIST failed");
|
int peer_fd = op->peer_fd;
|
||||||
|
printf("Failed to get object list from OSD %lu, disconnecting peer\n", role_osd);
|
||||||
|
delete op;
|
||||||
|
ps->list_ops.erase(role_osd);
|
||||||
|
stop_client(peer_fd);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
printf(
|
printf(
|
||||||
"Got object list from OSD %lu: %ld objects (%lu of them stable)\n",
|
"Got object list from OSD %lu: %ld objects (%lu of them stable)\n",
|
||||||
osd_num, op->reply.hdr.retval, op->reply.sec_list.stable_count
|
role_osd, op->reply.hdr.retval, op->reply.sec_list.stable_count
|
||||||
);
|
);
|
||||||
ps->list_results[osd_num] = {
|
ps->list_results[role_osd] = {
|
||||||
.buf = (obj_ver_id*)op->buf,
|
.buf = (obj_ver_id*)op->buf,
|
||||||
.total_count = (uint64_t)op->reply.hdr.retval,
|
.total_count = (uint64_t)op->reply.hdr.retval,
|
||||||
.stable_count = op->reply.sec_list.stable_count,
|
.stable_count = op->reply.sec_list.stable_count,
|
||||||
};
|
};
|
||||||
|
// so it doesn't get freed. FIXME: do it better
|
||||||
op->buf = NULL;
|
op->buf = NULL;
|
||||||
ps->list_done++;
|
ps->list_done++;
|
||||||
|
ps->list_ops.erase(role_osd);
|
||||||
delete op;
|
delete op;
|
||||||
};
|
};
|
||||||
outbox_push(cl, op);
|
outbox_push(cl, op);
|
||||||
|
ps->list_ops[role_osd] = op;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
ringloop->wakeup();
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ void pg_t::remember_object(pg_obj_state_check_t &st, std::vector<obj_ver_role> &
|
||||||
else if (st.n_roles < pg.pg_minsize)
|
else if (st.n_roles < pg.pg_minsize)
|
||||||
{
|
{
|
||||||
state = OBJ_INCOMPLETE;
|
state = OBJ_INCOMPLETE;
|
||||||
pg.state = pg.state | PG_HAS_INCOMPLETE;
|
pg.state = pg.state | PG_HAS_UNFOUND;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -164,6 +164,7 @@ void pg_t::calc_object_states()
|
||||||
std::sort(all.begin(), all.end());
|
std::sort(all.begin(), all.end());
|
||||||
// Walk over it and check object states
|
// Walk over it and check object states
|
||||||
pg.clean_count = 0;
|
pg.clean_count = 0;
|
||||||
|
pg.state = 0;
|
||||||
int replica = 0;
|
int replica = 0;
|
||||||
pg_obj_state_check_t st;
|
pg_obj_state_check_t st;
|
||||||
for (int i = 0; i < all.size(); i++)
|
for (int i = 0; i < all.size(); i++)
|
||||||
|
@ -225,7 +226,7 @@ void pg_t::calc_object_states()
|
||||||
{
|
{
|
||||||
st.n_stable++;
|
st.n_stable++;
|
||||||
}
|
}
|
||||||
if (pg.target_set[replica] == all[i].osd_num)
|
if (pg.cur_set[replica] == all[i].osd_num)
|
||||||
{
|
{
|
||||||
st.n_matched++;
|
st.n_matched++;
|
||||||
}
|
}
|
||||||
|
@ -242,5 +243,17 @@ void pg_t::calc_object_states()
|
||||||
st.obj_end = st.ver_end = all.size();
|
st.obj_end = st.ver_end = all.size();
|
||||||
remember_object(st, all);
|
remember_object(st, all);
|
||||||
}
|
}
|
||||||
|
if (pg.pg_cursize < pg.pg_size)
|
||||||
|
{
|
||||||
|
pg.state = pg.state | PG_DEGRADED;
|
||||||
|
}
|
||||||
|
printf(
|
||||||
|
"PG %u is active%s%s%s%s\n", pg.pg_num,
|
||||||
|
(pg.state & PG_DEGRADED) ? " + degraded" : "",
|
||||||
|
(pg.state & PG_HAS_UNFOUND) ? " + has_unfound" : "",
|
||||||
|
(pg.state & PG_HAS_DEGRADED) ? " + has_degraded" : "",
|
||||||
|
(pg.state & PG_HAS_MISPLACED) ? " + has_misplaced" : "",
|
||||||
|
(pg.state & PG_HAS_UNCLEAN) ? " + has_unclean" : ""
|
||||||
|
);
|
||||||
pg.state = pg.state | PG_ACTIVE;
|
pg.state = pg.state | PG_ACTIVE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,10 +14,11 @@
|
||||||
#define PG_INCOMPLETE (1<<2)
|
#define PG_INCOMPLETE (1<<2)
|
||||||
#define PG_ACTIVE (1<<3)
|
#define PG_ACTIVE (1<<3)
|
||||||
// Plus any of these:
|
// Plus any of these:
|
||||||
#define PG_HAS_INCOMPLETE (1<<4)
|
#define PG_DEGRADED (1<<4)
|
||||||
#define PG_HAS_DEGRADED (1<<5)
|
#define PG_HAS_UNFOUND (1<<5)
|
||||||
#define PG_HAS_MISPLACED (1<<6)
|
#define PG_HAS_DEGRADED (1<<6)
|
||||||
#define PG_HAS_UNCLEAN (1<<7)
|
#define PG_HAS_MISPLACED (1<<7)
|
||||||
|
#define PG_HAS_UNCLEAN (1<<8)
|
||||||
|
|
||||||
// OSD object states
|
// OSD object states
|
||||||
#define OBJ_CLEAN 0x01
|
#define OBJ_CLEAN 0x01
|
||||||
|
@ -40,7 +41,9 @@ typedef std::vector<pg_obj_loc_t> pg_osd_set_t;
|
||||||
|
|
||||||
struct pg_osd_set_state_t
|
struct pg_osd_set_state_t
|
||||||
{
|
{
|
||||||
|
// (role -> osd_num_t) map, as in pg.target_set and pg.cur_set
|
||||||
std::vector<osd_num_t> read_target;
|
std::vector<osd_num_t> read_target;
|
||||||
|
// full OSD set including additional OSDs where the object is misplaced
|
||||||
pg_osd_set_t osd_set;
|
pg_osd_set_t osd_set;
|
||||||
uint64_t state = 0;
|
uint64_t state = 0;
|
||||||
uint64_t object_count = 0;
|
uint64_t object_count = 0;
|
||||||
|
@ -53,9 +56,12 @@ struct pg_list_result_t
|
||||||
uint64_t stable_count;
|
uint64_t stable_count;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct osd_op_t;
|
||||||
|
|
||||||
struct pg_peering_state_t
|
struct pg_peering_state_t
|
||||||
{
|
{
|
||||||
// osd_num -> list result
|
// osd_num -> list result
|
||||||
|
spp::sparse_hash_map<osd_num_t, osd_op_t*> list_ops;
|
||||||
spp::sparse_hash_map<osd_num_t, pg_list_result_t> list_results;
|
spp::sparse_hash_map<osd_num_t, pg_list_result_t> list_results;
|
||||||
int list_done = 0;
|
int list_done = 0;
|
||||||
};
|
};
|
||||||
|
@ -103,9 +109,12 @@ struct pg_t
|
||||||
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
|
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
|
||||||
pg_num_t pg_num;
|
pg_num_t pg_num;
|
||||||
uint64_t clean_count = 0;
|
uint64_t clean_count = 0;
|
||||||
// target_set = (role => osd_num or UINT64_MAX if missing). role numbers start with zero
|
// target_set is the "correct" peer OSD set for this PG
|
||||||
std::vector<osd_num_t> target_set;
|
std::vector<osd_num_t> target_set;
|
||||||
// moved object map. by default, each object is considered to reside on the target_set.
|
// cur_set is the current set of connected peer OSDs for this PG
|
||||||
|
// cur_set = (role => osd_num or UINT64_MAX if missing). role numbers begin with zero
|
||||||
|
std::vector<osd_num_t> cur_set;
|
||||||
|
// moved object map. by default, each object is considered to reside on the cur_set.
|
||||||
// this map stores all objects that differ.
|
// this map stores all objects that differ.
|
||||||
// it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario
|
// it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario
|
||||||
// which is up to ~192 MB per 1 TB in the worst case scenario
|
// which is up to ~192 MB per 1 TB in the worst case scenario
|
||||||
|
|
|
@ -54,7 +54,9 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
||||||
pg_num_t pg_num = (oid % pg_count); // FIXME +1
|
pg_num_t pg_num = (oid % pg_count); // FIXME +1
|
||||||
if (((end - 1) / (bs_block_size*2)) != oid.stripe ||
|
if (((end - 1) / (bs_block_size*2)) != oid.stripe ||
|
||||||
(start % bs_disk_alignment) || (end % bs_disk_alignment) ||
|
(start % bs_disk_alignment) || (end % bs_disk_alignment) ||
|
||||||
pg_num > pgs.size())
|
pg_num > pgs.size() ||
|
||||||
|
// FIXME: Postpone operations in inactive PGs
|
||||||
|
!(pgs[pg_num].state & PG_ACTIVE))
|
||||||
{
|
{
|
||||||
finish_primary_op(cur_op, -EINVAL);
|
finish_primary_op(cur_op, -EINVAL);
|
||||||
return;
|
return;
|
||||||
|
@ -79,30 +81,28 @@ void osd_t::exec_primary_read(osd_op_t *cur_op)
|
||||||
auto vo_it = pgs[pg_num].ver_override.find(oid);
|
auto vo_it = pgs[pg_num].ver_override.find(oid);
|
||||||
op_data->target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX;
|
op_data->target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX;
|
||||||
}
|
}
|
||||||
if (pgs[pg_num].pg_cursize == pgs[pg_num].pg_size)
|
if (pgs[pg_num].state == PG_ACTIVE)
|
||||||
{
|
{
|
||||||
// Fast happy-path
|
// Fast happy-path
|
||||||
submit_read_subops(pgs[pg_num].pg_minsize, pgs[pg_num].target_set.data(), cur_op);
|
submit_read_subops(pgs[pg_num].pg_minsize, pgs[pg_num].cur_set.data(), cur_op);
|
||||||
cur_op->send_list.push_back(cur_op->buf, cur_op->op.rw.len);
|
cur_op->send_list.push_back(cur_op->buf, cur_op->op.rw.len);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// PG is degraded
|
// PG may be degraded or have misplaced objects
|
||||||
uint64_t* target_set;
|
spp::sparse_hash_map<object_id, pg_osd_set_state_t*> obj_states;
|
||||||
{
|
auto st_it = pgs[pg_num].obj_states.find(oid);
|
||||||
auto it = pgs[pg_num].obj_states.find(oid);
|
uint64_t* cur_set = (st_it != pgs[pg_num].obj_states.end()
|
||||||
target_set = (it != pgs[pg_num].obj_states.end()
|
? st_it->second->read_target.data()
|
||||||
? it->second->read_target.data()
|
: pgs[pg_num].cur_set.data());
|
||||||
: pgs[pg_num].target_set.data());
|
if (extend_missing_stripes(stripes, cur_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0)
|
||||||
}
|
|
||||||
if (extend_missing_stripes(stripes, target_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0)
|
|
||||||
{
|
{
|
||||||
free(op_data);
|
free(op_data);
|
||||||
finish_primary_op(cur_op, -EIO);
|
finish_primary_op(cur_op, -EIO);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Submit reads
|
// Submit reads
|
||||||
submit_read_subops(pgs[pg_num].pg_size, target_set, cur_op);
|
submit_read_subops(pgs[pg_num].pg_size, cur_set, cur_op);
|
||||||
op_data->pg_minsize = pgs[pg_num].pg_minsize;
|
op_data->pg_minsize = pgs[pg_num].pg_minsize;
|
||||||
op_data->pg_size = pgs[pg_num].pg_size;
|
op_data->pg_size = pgs[pg_num].pg_size;
|
||||||
op_data->degraded = 1;
|
op_data->degraded = 1;
|
||||||
|
@ -158,11 +158,11 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_set, int minsize, int size)
|
int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size)
|
||||||
{
|
{
|
||||||
for (int role = 0; role < minsize; role++)
|
for (int role = 0; role < minsize; role++)
|
||||||
{
|
{
|
||||||
if (stripes[role].end != 0 && target_set[role] == 0)
|
if (stripes[role].end != 0 && osd_set[role] == 0)
|
||||||
{
|
{
|
||||||
stripes[role].real_start = stripes[role].real_end = 0;
|
stripes[role].real_start = stripes[role].real_end = 0;
|
||||||
// Stripe is missing. Extend read to other stripes.
|
// Stripe is missing. Extend read to other stripes.
|
||||||
|
@ -170,7 +170,7 @@ int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_
|
||||||
int exist = 0;
|
int exist = 0;
|
||||||
for (int j = 0; j < size; j++)
|
for (int j = 0; j < size; j++)
|
||||||
{
|
{
|
||||||
if (target_set[j] != 0)
|
if (osd_set[j] != 0)
|
||||||
{
|
{
|
||||||
if (stripes[j].real_end == 0 || j >= minsize)
|
if (stripes[j].real_end == 0 || j >= minsize)
|
||||||
{
|
{
|
||||||
|
@ -199,7 +199,7 @@ int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *target_
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void osd_t::submit_read_subops(int read_pg_size, const uint64_t* target_set, osd_op_t *cur_op)
|
void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
|
||||||
{
|
{
|
||||||
osd_primary_read_t *op_data = (osd_primary_read_t*)cur_op->op_data;
|
osd_primary_read_t *op_data = (osd_primary_read_t*)cur_op->op_data;
|
||||||
osd_read_stripe_t *stripes = op_data->stripes;
|
osd_read_stripe_t *stripes = op_data->stripes;
|
||||||
|
@ -230,7 +230,7 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* target_set, osd
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
auto role_osd_num = target_set[role];
|
auto role_osd_num = osd_set[role];
|
||||||
if (role_osd_num != 0)
|
if (role_osd_num != 0)
|
||||||
{
|
{
|
||||||
if (role_osd_num == this->osd_num)
|
if (role_osd_num == this->osd_num)
|
||||||
|
|
2
test.cpp
2
test.cpp
|
@ -344,9 +344,9 @@ int main05(int argc, char *argv[])
|
||||||
.state = PG_PEERING,
|
.state = PG_PEERING,
|
||||||
.pg_num = 1,
|
.pg_num = 1,
|
||||||
.target_set = { 1, 2, 3 },
|
.target_set = { 1, 2, 3 },
|
||||||
|
.cur_set = { 1, 2, 3 },
|
||||||
.peering_state = new pg_peering_state_t(),
|
.peering_state = new pg_peering_state_t(),
|
||||||
};
|
};
|
||||||
pg.peering_state->list_done = 3;
|
|
||||||
for (uint64_t osd_num = 1; osd_num <= 3; osd_num++)
|
for (uint64_t osd_num = 1; osd_num <= 3; osd_num++)
|
||||||
{
|
{
|
||||||
pg_list_result_t r = {
|
pg_list_result_t r = {
|
||||||
|
|
Loading…
Reference in New Issue