Compare commits
2 Commits
ddbaa802be
...
ee99e4abb1
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | ee99e4abb1 | |
Vitaliy Filippov | 0d01573da3 |
|
@ -218,5 +218,5 @@ All other client-side components are based on the client library:
|
|||
- Deleting images in a degraded cluster may currently lead to objects reappearing
|
||||
after dead OSDs come back, and in case of erasure-coded pools, they may even
|
||||
reappear as incomplete. Just repeat the removal request again in this case.
|
||||
This problem will be fixed in the nearest future, the fix is already implemented
|
||||
in the "epoch-deletions" branch.
|
||||
This problem will be fixed in the future, along with the metadata disk storage
|
||||
format update.
|
||||
|
|
|
@ -24,8 +24,8 @@
|
|||
Один OSD управляет одним диском (или разделом). OSD общаются с etcd и друг с другом — от etcd они
|
||||
получают состояние кластера, а друг другу передают запросы записи и чтения вторичных копий данных.
|
||||
- **etcd** — кластерная key/value база данных, используется для хранения настроек и верхнеуровневого
|
||||
состояния кластера, а также предотвращения разделения сознания. Блоки данных в etcd не хранятся,
|
||||
в обработке клиентских запросов чтения и записи etcd не участвует.
|
||||
состояния кластера, а также предотвращения разделения сознания (splitbrain). Блоки данных в etcd не
|
||||
хранятся, в обработке клиентских запросов чтения и записи etcd не участвует.
|
||||
- **Монитор** — отдельный демон на node.js, рассчитывающий необходимые изменения в конфигурацию
|
||||
кластера, сохраняющий эту информацию в etcd и таким образом командующий OSD применить эти изменения.
|
||||
Также агрегирует статистику. Контактирует только с etcd, OSD с монитором не общаются.
|
||||
|
@ -43,8 +43,8 @@
|
|||
|
||||
## Клиентские компоненты
|
||||
|
||||
- **Клиентская библиотека** — инкапсулирует логику на стороне клиента. Соединяются с etcd и со всеми OSD,
|
||||
от etcd получают состояние кластера, команды чтения и записи отправляют на все OSD напрямую.
|
||||
- **Клиентская библиотека** — инкапсулирует логику на стороне клиента. Соединяется с etcd и со всеми OSD,
|
||||
от etcd получает состояние кластера, команды чтения и записи отправляет на все OSD напрямую.
|
||||
В силу архитектуры все отдельные блоки данных (по умолчанию по 128 КБ) располагается на разных
|
||||
OSD, но клиент устроен так, что всегда точно знает, к какому OSD обращаться, и подключается
|
||||
к нему напрямую.
|
||||
|
@ -227,5 +227,5 @@
|
|||
- Удаление образов в деградированном кластере может в данный момент приводить к повторному
|
||||
"появлению" удалённых объектов после поднятия отключённых OSD, причём в случае EC-пулов,
|
||||
объекты могут появиться в виде "неполных". Если вы столкнётесь с такой ситуацией, просто
|
||||
повторите запрос удаления. Исправление этой проблемы уже реализовано в ветке "epoch-deletions"
|
||||
и вскоре будет включено в релиз.
|
||||
повторите запрос удаления. Данная проблема будет исправлена в будущем вместе с обновлением
|
||||
дискового формата хранения метаданных.
|
||||
|
|
|
@ -784,11 +784,17 @@ void nfs_client_t::stop()
|
|||
stopped = true;
|
||||
if (refs <= 0)
|
||||
{
|
||||
#ifdef WITH_RDMACM
|
||||
destroy_rdma_conn();
|
||||
#endif
|
||||
auto parent = this->parent;
|
||||
parent->rpc_clients.erase(this);
|
||||
parent->active_connections--;
|
||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
|
||||
close(nfs_fd);
|
||||
if (nfs_fd >= 0)
|
||||
{
|
||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
|
||||
close(nfs_fd);
|
||||
}
|
||||
delete this;
|
||||
parent->check_exit();
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ public:
|
|||
nfs_rdma_conn_t *rdma_conn = NULL;
|
||||
|
||||
// <TCP>
|
||||
int nfs_fd;
|
||||
int nfs_fd = -1;
|
||||
int epoll_events = 0;
|
||||
|
||||
// Read state
|
||||
|
@ -149,4 +149,5 @@ public:
|
|||
void *rdma_malloc(size_t size);
|
||||
void rdma_encode_header(rpc_op_t *rop);
|
||||
void rdma_queue_reply(rpc_op_t *rop);
|
||||
void destroy_rdma_conn();
|
||||
};
|
||||
|
|
|
@ -44,7 +44,7 @@ struct nfs_rdma_context_t
|
|||
{
|
||||
std::string bind_address;
|
||||
int rdmacm_port = 0;
|
||||
int max_send = 8, max_recv = 8; // FIXME max_send and max_recv should probably be equal
|
||||
uint32_t max_iodepth = 16;
|
||||
uint64_t rdma_malloc_round_to = 1048576, rdma_max_unused_buffers = 500*1048576;
|
||||
uint64_t max_send_size = 256*1024, max_recv_size = 256*1024;
|
||||
|
||||
|
@ -72,12 +72,11 @@ struct nfs_rdma_conn_t
|
|||
nfs_rdma_context_t *ctx = NULL;
|
||||
nfs_client_t *client = NULL;
|
||||
rdma_cm_id *id = NULL;
|
||||
int max_send = 8, max_recv = 8; // FIXME set it
|
||||
int max_send_size = 256*1024, max_recv_size = 256*1024;
|
||||
int remote_max_send_size = 1024, remote_max_recv_size = 1024; // FIXME is it used?
|
||||
int remote_max_send_size = 1024, remote_max_recv_size = 1024;
|
||||
bool remote_invalidate = false;
|
||||
bool established = false;
|
||||
int cur_recv = 0, cur_send = 0; // FIXME use it ...
|
||||
uint32_t cur_credit = 16;
|
||||
std::vector<nfs_rdma_buf_t> recv_buffers;
|
||||
std::map<void*, nfs_rdma_buf_t> used_buffers;
|
||||
int next_recv_buf = 0;
|
||||
|
@ -92,6 +91,7 @@ struct nfs_rdma_conn_t
|
|||
void post_recv(nfs_rdma_buf_t b);
|
||||
void post_send();
|
||||
bool handle_recv(void *buf, size_t len);
|
||||
void free_rdma_rpc_op(rpc_op_t *rop);
|
||||
};
|
||||
|
||||
nfs_rdma_context_t* nfs_proxy_t::create_rdma(const std::string & bind_address, int rdmacm_port)
|
||||
|
@ -240,7 +240,7 @@ void nfs_rdma_context_t::handle_rdmacm_events()
|
|||
fprintf(stderr, "Received %s event for connection 0x%lx - closing it\n",
|
||||
event_type_name, (uint64_t)ev->id);
|
||||
auto conn = conn_it->second;
|
||||
delete conn;
|
||||
conn->client->stop();
|
||||
}
|
||||
}
|
||||
else if (ev->event == RDMA_CM_EVENT_ESTABLISHED)
|
||||
|
@ -267,7 +267,7 @@ void nfs_rdma_context_t::handle_rdmacm_events()
|
|||
|
||||
void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
|
||||
{
|
||||
this->used_max_cqe += max_send+max_recv;
|
||||
this->used_max_cqe += max_iodepth*2;
|
||||
if (this->used_max_cqe > this->max_cqe)
|
||||
{
|
||||
// Resize CQ
|
||||
|
@ -287,8 +287,9 @@ void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
|
|||
.send_cq = this->cq,
|
||||
.recv_cq = this->cq,
|
||||
.cap = {
|
||||
.max_send_wr = max_send*2, // FIXME how many max_send/max_recv?
|
||||
.max_recv_wr = max_recv,
|
||||
// each op at each moment takes 1 RDMA_RECV or 1 RDMA_READ or 1 RDMA_WRITE + 1 RDMA_SEND
|
||||
.max_send_wr = max_iodepth*2,
|
||||
.max_recv_wr = max_iodepth,
|
||||
.max_send_sge = 1, // we don't need S/G currently
|
||||
.max_recv_sge = 1,
|
||||
},
|
||||
|
@ -304,8 +305,8 @@ void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
|
|||
.format_identifier = NFS_RDMACM_PRIVATE_DATA_MAGIC_LE,
|
||||
.version = 1,
|
||||
.remote_invalidate = 0, // FIXME what is remote_invalidate?
|
||||
.max_send_size = (max_send_size <= 256*1024 ? max_send_size/1024 - 1 : 255),
|
||||
.max_recv_size = (max_recv_size <= 256*1024 ? max_recv_size/1024 - 1 : 255),
|
||||
.max_send_size = (uint8_t)(max_send_size <= 256*1024 ? max_send_size/1024 - 1 : 255),
|
||||
.max_recv_size = (uint8_t)(max_recv_size <= 256*1024 ? max_recv_size/1024 - 1 : 255),
|
||||
};
|
||||
rdma_conn_param conn_params = {
|
||||
.private_data = &private_data,
|
||||
|
@ -327,6 +328,7 @@ void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
|
|||
auto conn = new nfs_rdma_conn_t();
|
||||
conn->ctx = this;
|
||||
conn->id = ev->id;
|
||||
conn->cur_credit = max_iodepth;
|
||||
conn->max_send_size = max_send_size;
|
||||
conn->max_recv_size = max_recv_size;
|
||||
rdma_connections[ev->id] = conn;
|
||||
|
@ -382,7 +384,7 @@ void nfs_rdma_context_t::rdmacm_established(rdma_cm_event *ev)
|
|||
|
||||
void nfs_rdma_conn_t::post_initial_receives()
|
||||
{
|
||||
while (cur_recv < max_recv)
|
||||
for (int i = 0; i < cur_credit; i++)
|
||||
{
|
||||
auto b = create_buf(max_recv_size);
|
||||
recv_buffers.push_back(b);
|
||||
|
@ -406,6 +408,10 @@ nfs_rdma_buf_t nfs_rdma_conn_t::create_buf(size_t len)
|
|||
|
||||
void nfs_rdma_conn_t::post_recv(nfs_rdma_buf_t b)
|
||||
{
|
||||
if (client->stopped)
|
||||
{
|
||||
return;
|
||||
}
|
||||
ibv_sge sge = {
|
||||
.addr = (uintptr_t)b.buf,
|
||||
.length = (uint32_t)b.len,
|
||||
|
@ -423,7 +429,6 @@ void nfs_rdma_conn_t::post_recv(nfs_rdma_buf_t b)
|
|||
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
cur_recv++;
|
||||
}
|
||||
|
||||
void nfs_client_t::rdma_encode_header(rpc_op_t *rop)
|
||||
|
@ -431,7 +436,7 @@ void nfs_client_t::rdma_encode_header(rpc_op_t *rop)
|
|||
rdma_msg outrmsg = {
|
||||
.rdma_xid = rop->in_rdma_msg.rdma_xid,
|
||||
.rdma_vers = rop->in_rdma_msg.rdma_vers,
|
||||
.rdma_credit = 1, // FIXME!!!11
|
||||
.rdma_credit = rdma_conn->cur_credit,
|
||||
.rdma_body = {
|
||||
.proc = rop->rdma_error ? RDMA_ERROR : RDMA_MSG,
|
||||
},
|
||||
|
@ -458,6 +463,7 @@ void nfs_client_t::rdma_queue_reply(rpc_op_t *rop)
|
|||
|
||||
void nfs_rdma_conn_t::post_send()
|
||||
{
|
||||
again:
|
||||
while (outbox.size() > outbox_pos)
|
||||
{
|
||||
auto rop = outbox[outbox_pos];
|
||||
|
@ -492,7 +498,15 @@ void nfs_rdma_conn_t::post_send()
|
|||
size_t pos = 0;
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
{
|
||||
assert(pos + iov_list[i].iov_len <= b.len);
|
||||
if (pos + iov_list[i].iov_len > b.len)
|
||||
{
|
||||
// Message exceeds buffer - stop the client
|
||||
auto cli = ((nfs_client_t*)rop->client);
|
||||
cli->stop();
|
||||
outbox.erase(outbox.begin()+outbox_pos, outbox.begin()+outbox_pos+1);
|
||||
cli->rdma_conn->free_rdma_rpc_op(rop);
|
||||
goto again;
|
||||
}
|
||||
memcpy(b.buf + pos, iov_list[i].iov_base, iov_list[i].iov_len);
|
||||
pos += iov_list[i].iov_len;
|
||||
}
|
||||
|
@ -542,7 +556,7 @@ void nfs_rdma_conn_t::post_send()
|
|||
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
cur_send++;
|
||||
outbox_pos++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -579,15 +593,27 @@ void nfs_rdma_context_t::handle_io()
|
|||
if (wc[i].status != IBV_WC_SUCCESS)
|
||||
{
|
||||
fprintf(stderr, "RDMA work request failed for queue %d with status: %s, stopping client\n", wc[i].qp_num, ibv_wc_status_str(wc[i].status));
|
||||
delete conn;
|
||||
continue;
|
||||
conn->client->stop();
|
||||
// but continue to handle events to purge the queue
|
||||
}
|
||||
if (wc[i].wr_id == 1)
|
||||
{
|
||||
// 1 = receive
|
||||
conn->cur_recv--;
|
||||
auto & b = conn->recv_buffers[conn->next_recv_buf];
|
||||
auto is_continued = conn->handle_recv(b.buf, wc[i].byte_len);
|
||||
bool is_continued = false;
|
||||
if (conn->cur_credit > 0 && !conn->client->stopped)
|
||||
{
|
||||
// Increase client refcount while the RPC call is being processed
|
||||
conn->client->refs++;
|
||||
conn->cur_credit--;
|
||||
is_continued = conn->handle_recv(b.buf, wc[i].byte_len);
|
||||
}
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "Warning: NFS client credit exceeded for queue %d, stopping client\n", wc[i].qp_num);
|
||||
conn->client->stop();
|
||||
continue;
|
||||
}
|
||||
if (is_continued)
|
||||
{
|
||||
// Buffer is required to handle request
|
||||
|
@ -607,24 +633,9 @@ void nfs_rdma_context_t::handle_io()
|
|||
// 2 = send
|
||||
auto rop = conn->outbox[0];
|
||||
conn->outbox.erase(conn->outbox.begin(), conn->outbox.begin()+1);
|
||||
conn->outbox_pos--;
|
||||
// Free rpc_op
|
||||
xdr_reset(rop->xdrs);
|
||||
proxy->xdr_pool.push_back(rop->xdrs);
|
||||
if (rop->buffer && rop->referenced)
|
||||
{
|
||||
// Reuse the buffer
|
||||
auto & ub = conn->used_buffers.at(rop->buffer);
|
||||
conn->recv_buffers.push_back(ub);
|
||||
conn->post_recv(ub);
|
||||
}
|
||||
auto rdma_chunk = xdr_get_rdma_chunk(rop->xdrs);
|
||||
if (rdma_chunk)
|
||||
{
|
||||
rdma_malloc_free(alloc, rdma_chunk);
|
||||
xdr_set_rdma_chunk(rop->xdrs, NULL);
|
||||
}
|
||||
free(rop);
|
||||
conn->post_send();
|
||||
conn->free_rdma_rpc_op(rop);
|
||||
}
|
||||
else if (wc[i].wr_id == 3)
|
||||
{
|
||||
|
@ -644,6 +655,28 @@ void nfs_rdma_context_t::handle_io()
|
|||
} while (event_count > 0);
|
||||
}
|
||||
|
||||
void nfs_rdma_conn_t::free_rdma_rpc_op(rpc_op_t *rop)
|
||||
{
|
||||
xdr_reset(rop->xdrs);
|
||||
ctx->proxy->xdr_pool.push_back(rop->xdrs);
|
||||
if (rop->buffer && rop->referenced)
|
||||
{
|
||||
// Reuse the buffer
|
||||
auto & ub = used_buffers.at(rop->buffer);
|
||||
recv_buffers.push_back(ub);
|
||||
post_recv(ub);
|
||||
}
|
||||
auto rdma_chunk = xdr_get_rdma_chunk(rop->xdrs);
|
||||
if (rdma_chunk)
|
||||
{
|
||||
rdma_malloc_free(ctx->alloc, rdma_chunk);
|
||||
xdr_set_rdma_chunk(rop->xdrs, NULL);
|
||||
}
|
||||
free(rop);
|
||||
cur_credit++;
|
||||
client->deref();
|
||||
}
|
||||
|
||||
// returns false if handling is done, returns true if handling is continued asynchronously
|
||||
bool nfs_rdma_conn_t::handle_recv(void *buf, size_t len)
|
||||
{
|
||||
|
@ -750,7 +783,6 @@ bool nfs_rdma_conn_t::handle_recv(void *buf, size_t len)
|
|||
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
cur_send++;
|
||||
xdr_set_rdma_chunk(rop->xdrs, buf);
|
||||
rop->referenced = 1;
|
||||
chunk_inbox.push_back(rop);
|
||||
|
@ -764,4 +796,13 @@ void *nfs_client_t::rdma_malloc(size_t size)
|
|||
return rdma_malloc_alloc(rdma_conn->ctx->alloc, size);
|
||||
}
|
||||
|
||||
void nfs_client_t::destroy_rdma_conn()
|
||||
{
|
||||
if (rdma_conn)
|
||||
{
|
||||
delete rdma_conn;
|
||||
rdma_conn = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue