vitastor/osd_peering.cpp

627 lines
20 KiB
C++

#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <algorithm>
#include "osd.h"
void osd_t::init_primary()
{
// Initial test version of clustering code requires exactly 2 peers
// FIXME Hardcode
std::string peerstr = config["peers"];
while (peerstr.size())
{
int pos = peerstr.find(',');
peers.push_back(parse_peer(pos < 0 ? peerstr : peerstr.substr(0, pos)));
peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1);
for (int i = 0; i < peers.size()-1; i++)
if (peers[i].osd_num == peers[peers.size()-1].osd_num)
throw std::runtime_error("same osd number "+std::to_string(peers[i].osd_num)+" specified twice in peers");
}
if (peers.size() < 2)
throw std::runtime_error("run_primary requires at least 2 peers");
pgs[1] = (pg_t){
.state = PG_OFFLINE,
.pg_cursize = 0,
.pg_num = 1,
.target_set = { 1, 2, 3 },
.cur_set = { 1, 0, 0 },
};
pgs[1].print_state();
pg_count = 1;
peering_state = OSD_CONNECTING_PEERS;
}
osd_peer_def_t osd_t::parse_peer(std::string peer)
{
// OSD_NUM:IP:PORT
int pos1 = peer.find(':');
int pos2 = peer.find(':', pos1+1);
if (pos1 < 0 || pos2 < 0)
throw new std::runtime_error("OSD peer string must be in the form OSD_NUM:IP:PORT");
osd_peer_def_t r;
r.addr = peer.substr(pos1+1, pos2-pos1-1);
std::string osd_num_str = peer.substr(0, pos1);
std::string port_str = peer.substr(pos2+1);
r.osd_num = strtoull(osd_num_str.c_str(), NULL, 10);
if (!r.osd_num)
throw new std::runtime_error("Could not parse OSD peer osd_num");
r.port = strtoull(port_str.c_str(), NULL, 10);
if (!r.port)
throw new std::runtime_error("Could not parse OSD peer port");
return r;
}
void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback)
{
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
{
callback(osd_num, -EINVAL);
return;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(peer_port ? peer_port : 11203);
int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
if (peer_fd < 0)
{
callback(osd_num, -errno);
return;
}
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS)
{
close(peer_fd);
callback(osd_num, -errno);
return;
}
clients[peer_fd] = (osd_client_t){
.peer_addr = addr,
.peer_port = peer_port,
.peer_fd = peer_fd,
.peer_state = PEER_CONNECTING,
.connect_callback = callback,
.osd_num = osd_num,
.in_buf = malloc(receive_buffer_size),
};
osd_peer_fds[osd_num] = peer_fd;
// Add FD to epoll (EPOLLOUT for tracking connect() result)
epoll_event ev;
ev.data.fd = peer_fd;
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
}
void osd_t::handle_connect_result(int peer_fd)
{
auto & cl = clients[peer_fd];
osd_num_t osd_num = cl.osd_num;
auto callback = cl.connect_callback;
int result = 0;
socklen_t result_len = sizeof(result);
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
{
result = errno;
}
if (result != 0)
{
stop_client(peer_fd);
callback(osd_num, -result);
return;
}
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
// Disable EPOLLOUT on this fd
cl.connect_callback = NULL;
cl.peer_state = PEER_CONNECTED;
epoll_event ev;
ev.data.fd = peer_fd;
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
callback(osd_num, peer_fd);
}
// Peering loop
void osd_t::handle_peers()
{
if (peering_state & OSD_CONNECTING_PEERS)
{
for (int i = 0; i < peers.size(); i++)
{
if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end() &&
time(NULL) - peers[i].last_connect_attempt > 5) // FIXME hardcode 5
{
peers[i].last_connect_attempt = time(NULL);
connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](osd_num_t osd_num, int peer_fd)
{
// FIXME: Check peer config after connecting
if (peer_fd < 0)
{
printf("Failed to connect to peer OSD %lu: %s\n", osd_num, strerror(-peer_fd));
return;
}
printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd);
int i;
for (i = 0; i < peers.size(); i++)
{
if (osd_peer_fds.find(peers[i].osd_num) == osd_peer_fds.end())
break;
}
if (i >= peers.size())
{
// Connected to all peers
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
}
repeer_pgs(osd_num, true);
});
}
}
}
if (peering_state & OSD_PEERING_PGS)
{
bool still = false;
for (auto & p: pgs)
{
if (p.second.state == PG_PEERING)
{
if (!p.second.peering_state->list_ops.size())
{
p.second.calc_object_states();
if (p.second.state & PG_HAS_UNCLEAN)
{
peering_state = peering_state | OSD_FLUSHING_PGS;
}
}
else
{
still = true;
}
}
}
if (!still)
{
// Done all PGs
peering_state = peering_state & ~OSD_PEERING_PGS;
}
}
if (peering_state & OSD_FLUSHING_PGS)
{
bool still = false;
for (auto & p: pgs)
{
if (p.second.state & PG_HAS_UNCLEAN)
{
if (!p.second.flush_batch)
{
submit_pg_flush_ops(p.first);
}
still = true;
}
}
if (!still)
{
peering_state = peering_state & ~OSD_FLUSHING_PGS;
}
}
}
#define FLUSH_BATCH 512
struct pg_flush_batch_t
{
std::map<osd_num_t, std::vector<obj_ver_id>> rollback_lists;
std::map<osd_num_t, std::vector<obj_ver_id>> stable_lists;
int flush_ops = 0, flush_done = 0;
int flush_objects = 0;
};
void osd_t::submit_pg_flush_ops(pg_num_t pg_num)
{
pg_t & pg = pgs[pg_num];
pg_flush_batch_t *fb = new pg_flush_batch_t();
pg.flush_batch = fb;
auto it = pg.flush_actions.begin(), prev_it = pg.flush_actions.begin();
bool first = true;
while (it != pg.flush_actions.end())
{
if (!first && (it->first.oid.inode != prev_it->first.oid.inode ||
(it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK)) &&
fb->rollback_lists[it->first.osd_num].size() >= FLUSH_BATCH ||
fb->stable_lists[it->first.osd_num].size() >= FLUSH_BATCH)
{
// Stop only at the object boundary
break;
}
it->second.submitted = true;
if (it->second.rollback)
{
fb->flush_objects++;
fb->rollback_lists[it->first.osd_num].push_back((obj_ver_id){
.oid = it->first.oid,
.version = it->second.rollback_to,
});
}
if (it->second.make_stable)
{
fb->flush_objects++;
fb->stable_lists[it->first.osd_num].push_back((obj_ver_id){
.oid = it->first.oid,
.version = it->second.stable_to,
});
}
prev_it = it;
first = false;
it++;
}
for (auto & l: fb->rollback_lists)
{
if (l.second.size() > 0)
{
fb->flush_ops++;
submit_flush_op(pg.pg_num, fb, true, l.first, l.second.size(), l.second.data());
}
}
for (auto & l: fb->stable_lists)
{
if (l.second.size() > 0)
{
fb->flush_ops++;
submit_flush_op(pg.pg_num, fb, false, l.first, l.second.size(), l.second.data());
}
}
}
void osd_t::handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok)
{
if (pgs.find(pg_num) == pgs.end() || pgs[pg_num].flush_batch != fb)
{
// Throw the result away
return;
}
if (!ok)
{
if (osd_num == this->osd_num)
throw std::runtime_error("Error while doing local flush operation");
else
{
assert(osd_peer_fds.find(osd_num) != osd_peer_fds.end());
stop_client(osd_peer_fds[osd_num]);
return;
}
}
fb->flush_done++;
if (fb->flush_done == fb->flush_ops)
{
// This flush batch is done
std::vector<osd_op_t*> continue_ops;
auto & pg = pgs[pg_num];
auto it = pg.flush_actions.begin(), prev_it = it;
auto erase_start = it;
while (1)
{
if (it == pg.flush_actions.end() ||
it->first.oid.inode != prev_it->first.oid.inode ||
(it->first.oid.stripe & ~STRIPE_MASK) != (prev_it->first.oid.stripe & ~STRIPE_MASK))
{
auto wr_it = pg.write_queue.find((object_id){
.inode = prev_it->first.oid.inode,
.stripe = (prev_it->first.oid.stripe & ~STRIPE_MASK),
});
if (wr_it != pg.write_queue.end())
{
continue_ops.push_back(wr_it->second);
pg.write_queue.erase(wr_it);
}
}
if ((it == pg.flush_actions.end() || !it->second.submitted) &&
erase_start != it)
{
pg.flush_actions.erase(erase_start, it);
}
if (it == pg.flush_actions.end())
{
break;
}
prev_it = it;
if (!it->second.submitted)
{
it++;
erase_start = it;
}
else
{
it++;
}
}
delete fb;
pg.flush_batch = NULL;
if (!pg.flush_actions.size())
{
pg.state = pg.state & ~PG_HAS_UNCLEAN;
pg.print_state();
}
for (osd_op_t *op: continue_ops)
{
continue_primary_write(op);
}
}
}
void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data)
{
osd_op_t *op = new osd_op_t();
// Copy buffer so it gets freed along with the operation
op->buf = malloc(sizeof(obj_ver_id) * count);
memcpy(op->buf, data, sizeof(obj_ver_id) * count);
if (osd_num == this->osd_num)
{
// local
op->bs_op = new blockstore_op_t({
.opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE),
.callback = [this, op, pg_num, fb](blockstore_op_t *bs_op)
{
handle_flush_op(pg_num, fb, this->osd_num, bs_op->retval == 0);
delete op;
},
.len = (uint32_t)count,
.buf = op->buf,
});
bs->enqueue_op(op->bs_op);
}
else
{
// Peer
int peer_fd = osd_peer_fds[osd_num];
op->op_type = OSD_OP_OUT;
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
op->send_list.push_back(op->buf, count * sizeof(obj_ver_id));
op->peer_fd = peer_fd;
op->req = {
.sec_stab = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = this->next_subop_id++,
.opcode = (uint64_t)(rollback ? OSD_OP_SECONDARY_ROLLBACK : OSD_OP_SECONDARY_STABILIZE),
},
.len = count * sizeof(obj_ver_id),
},
};
op->callback = [this, pg_num, fb](osd_op_t *op)
{
handle_flush_op(pg_num, fb, clients[op->peer_fd].osd_num, op->reply.hdr.retval == 0);
delete op;
};
outbox_push(clients[peer_fd], op);
}
}
void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected)
{
// Re-peer affected PGs
// FIXME: We shouldn't rely just on target_set. Other OSDs may also contain PG data.
osd_num_t real_osd = (is_connected ? osd_num : 0);
for (auto & p: pgs)
{
bool repeer = false;
for (int r = 0; r < p.second.target_set.size(); r++)
{
if (p.second.target_set[r] == osd_num &&
p.second.cur_set[r] != real_osd)
{
p.second.cur_set[r] = real_osd;
repeer = true;
break;
}
}
if (repeer)
{
// Repeer this pg
printf("Repeer PG %d because of OSD %lu\n", p.second.pg_num, osd_num);
start_pg_peering(p.second.pg_num);
peering_state |= OSD_PEERING_PGS;
}
}
}
// Repeer on each connect/disconnect peer event
void osd_t::start_pg_peering(pg_num_t pg_num)
{
auto & pg = pgs[pg_num];
pg.state = PG_PEERING;
pg.print_state();
pg.state_dict.clear();
pg.obj_states.clear();
pg.ver_override.clear();
pg.flush_actions.clear();
if (pg.flush_batch)
delete pg.flush_batch;
pg.flush_batch = NULL;
pg.pg_cursize = 0;
for (int role = 0; role < pg.cur_set.size(); role++)
{
if (pg.cur_set[role] != 0)
{
pg.pg_cursize++;
}
}
if (pg.pg_cursize < pg.pg_minsize)
{
pg.state = PG_INCOMPLETE;
pg.print_state();
}
if (pg.peering_state)
{
// Adjust the peering operation that's still in progress
for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end(); it++)
{
int role;
for (role = 0; role < pg.cur_set.size(); role++)
{
if (pg.cur_set[role] == it->first)
break;
}
if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size())
{
// Discard the result after completion, which, chances are, will be unsuccessful
auto list_op = it->second;
if (list_op->peer_fd == 0)
{
// Self
list_op->bs_op->callback = [list_op](blockstore_op_t *bs_op)
{
if (list_op->bs_op->buf)
free(list_op->bs_op->buf);
delete list_op;
};
}
else
{
// Peer
list_op->callback = [](osd_op_t *list_op)
{
delete list_op;
};
}
pg.peering_state->list_ops.erase(it);
it = pg.peering_state->list_ops.begin();
}
}
for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end(); it++)
{
int role;
for (role = 0; role < pg.cur_set.size(); role++)
{
if (pg.cur_set[role] == it->first)
break;
}
if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size())
{
if (it->second.buf)
{
free(it->second.buf);
}
pg.peering_state->list_results.erase(it);
it = pg.peering_state->list_results.begin();
}
}
}
if (pg.state == PG_INCOMPLETE)
{
if (pg.peering_state)
{
delete pg.peering_state;
pg.peering_state = NULL;
}
return;
}
if (!pg.peering_state)
{
pg.peering_state = new pg_peering_state_t();
}
auto ps = pg.peering_state;
for (int role = 0; role < pg.cur_set.size(); role++)
{
osd_num_t role_osd = pg.cur_set[role];
if (!role_osd)
{
continue;
}
if (ps->list_ops.find(role_osd) != ps->list_ops.end() ||
ps->list_results.find(role_osd) != ps->list_results.end())
{
continue;
}
if (role_osd == this->osd_num)
{
// Self
osd_op_t *op = new osd_op_t();
op->op_type = 0;
op->peer_fd = 0;
op->bs_op = new blockstore_op_t();
op->bs_op->opcode = BS_OP_LIST;
op->bs_op->oid.stripe = parity_block_size;
op->bs_op->len = pg_count;
op->bs_op->offset = pg.pg_num-1;
op->bs_op->callback = [ps, op, role_osd](blockstore_op_t *bs_op)
{
if (op->bs_op->retval < 0)
{
throw std::runtime_error("local OP_LIST failed");
}
printf(
"Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n",
role_osd, bs_op->retval, bs_op->version
);
ps->list_results[role_osd] = {
.buf = (obj_ver_id*)op->bs_op->buf,
.total_count = (uint64_t)op->bs_op->retval,
.stable_count = op->bs_op->version,
};
ps->list_done++;
ps->list_ops.erase(role_osd);
delete op;
};
bs->enqueue_op(op->bs_op);
ps->list_ops[role_osd] = op;
}
else
{
// Peer
auto & cl = clients[osd_peer_fds[role_osd]];
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE);
op->peer_fd = cl.peer_fd;
op->req = {
.sec_list = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = this->next_subop_id++,
.opcode = OSD_OP_SECONDARY_LIST,
},
.list_pg = pg.pg_num,
.pg_count = pg_count,
.parity_block_size = parity_block_size,
},
};
op->callback = [this, ps, role_osd](osd_op_t *op)
{
if (op->reply.hdr.retval < 0)
{
printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval);
ps->list_ops.erase(role_osd);
stop_client(op->peer_fd);
delete op;
return;
}
printf(
"Got object list from OSD %lu: %ld object versions (%lu of them stable)\n",
role_osd, op->reply.hdr.retval, op->reply.sec_list.stable_count
);
ps->list_results[role_osd] = {
.buf = (obj_ver_id*)op->buf,
.total_count = (uint64_t)op->reply.hdr.retval,
.stable_count = op->reply.sec_list.stable_count,
};
// set op->buf to NULL so it doesn't get freed
op->buf = NULL;
ps->list_done++;
ps->list_ops.erase(role_osd);
delete op;
};
outbox_push(cl, op);
ps->list_ops[role_osd] = op;
}
}
ringloop->wakeup();
}