Compare commits

..

1 Commits

Author SHA1 Message Date
Vitaliy Filippov ddbaa802be WIP NFS RDMA support
Test / test_rebalance_verify_ec (push) Successful in 1m46s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 1m48s Details
Test / test_write_no_same (push) Successful in 8s Details
Test / test_write (push) Successful in 30s Details
Test / test_switch_primary (push) Successful in 33s Details
Test / test_write_xor (push) Successful in 34s Details
Test / test_heal_pg_size_2 (push) Successful in 2m17s Details
Test / test_heal_ec (push) Successful in 2m16s Details
Test / test_heal_antietcd (push) Successful in 2m17s Details
Test / test_heal_csum_32k_dmj (push) Successful in 2m20s Details
Test / test_heal_csum_32k_dj (push) Successful in 2m22s Details
Test / test_heal_csum_32k (push) Successful in 2m18s Details
Test / test_heal_csum_4k_dmj (push) Successful in 2m17s Details
Test / test_heal_csum_4k_dj (push) Successful in 2m19s Details
Test / test_resize_auto (push) Successful in 8s Details
Test / test_resize (push) Successful in 15s Details
Test / test_osd_tags (push) Successful in 8s Details
Test / test_enospc (push) Successful in 10s Details
Test / test_snapshot_pool2 (push) Successful in 16s Details
Test / test_enospc_xor (push) Successful in 12s Details
Test / test_enospc_imm (push) Successful in 11s Details
Test / test_enospc_imm_xor (push) Successful in 13s Details
Test / test_scrub (push) Successful in 16s Details
Test / test_scrub_zero_osd_2 (push) Successful in 13s Details
Test / test_scrub_xor (push) Successful in 14s Details
Test / test_scrub_pg_size_3 (push) Successful in 15s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 15s Details
Test / test_scrub_ec (push) Successful in 16s Details
Test / test_nfs (push) Successful in 12s Details
Test / test_heal_csum_4k (push) Successful in 2m12s Details
2024-11-25 01:53:05 +03:00
5 changed files with 48 additions and 96 deletions

View File

@ -218,5 +218,5 @@ All other client-side components are based on the client library:
- Deleting images in a degraded cluster may currently lead to objects reappearing - Deleting images in a degraded cluster may currently lead to objects reappearing
after dead OSDs come back, and in case of erasure-coded pools, they may even after dead OSDs come back, and in case of erasure-coded pools, they may even
reappear as incomplete. Just repeat the removal request again in this case. reappear as incomplete. Just repeat the removal request again in this case.
This problem will be fixed in the future, along with the metadata disk storage This problem will be fixed in the nearest future, the fix is already implemented
format update. in the "epoch-deletions" branch.

View File

@ -24,8 +24,8 @@
Один OSD управляет одним диском (или разделом). OSD общаются с etcd и друг с другом — от etcd они Один OSD управляет одним диском (или разделом). OSD общаются с etcd и друг с другом — от etcd они
получают состояние кластера, а друг другу передают запросы записи и чтения вторичных копий данных. получают состояние кластера, а друг другу передают запросы записи и чтения вторичных копий данных.
- **etcd** — кластерная key/value база данных, используется для хранения настроек и верхнеуровневого - **etcd** — кластерная key/value база данных, используется для хранения настроек и верхнеуровневого
состояния кластера, а также предотвращения разделения сознания (splitbrain). Блоки данных в etcd не состояния кластера, а также предотвращения разделения сознания. Блоки данных в etcd не хранятся,
хранятся, в обработке клиентских запросов чтения и записи etcd не участвует. в обработке клиентских запросов чтения и записи etcd не участвует.
- **Монитор** — отдельный демон на node.js, рассчитывающий необходимые изменения в конфигурацию - **Монитор** — отдельный демон на node.js, рассчитывающий необходимые изменения в конфигурацию
кластера, сохраняющий эту информацию в etcd и таким образом командующий OSD применить эти изменения. кластера, сохраняющий эту информацию в etcd и таким образом командующий OSD применить эти изменения.
Также агрегирует статистику. Контактирует только с etcd, OSD с монитором не общаются. Также агрегирует статистику. Контактирует только с etcd, OSD с монитором не общаются.
@ -43,8 +43,8 @@
## Клиентские компоненты ## Клиентские компоненты
- **Клиентская библиотека** — инкапсулирует логику на стороне клиента. Соединяется с etcd и со всеми OSD, - **Клиентская библиотека** — инкапсулирует логику на стороне клиента. Соединяются с etcd и со всеми OSD,
от etcd получает состояние кластера, команды чтения и записи отправляет на все OSD напрямую. от etcd получают состояние кластера, команды чтения и записи отправляют на все OSD напрямую.
В силу архитектуры все отдельные блоки данных (по умолчанию по 128 КБ) располагается на разных В силу архитектуры все отдельные блоки данных (по умолчанию по 128 КБ) располагается на разных
OSD, но клиент устроен так, что всегда точно знает, к какому OSD обращаться, и подключается OSD, но клиент устроен так, что всегда точно знает, к какому OSD обращаться, и подключается
к нему напрямую. к нему напрямую.
@ -227,5 +227,5 @@
- Удаление образов в деградированном кластере может в данный момент приводить к повторному - Удаление образов в деградированном кластере может в данный момент приводить к повторному
"появлению" удалённых объектов после поднятия отключённых OSD, причём в случае EC-пулов, "появлению" удалённых объектов после поднятия отключённых OSD, причём в случае EC-пулов,
объекты могут появиться в виде "неполных". Если вы столкнётесь с такой ситуацией, просто объекты могут появиться в виде "неполных". Если вы столкнётесь с такой ситуацией, просто
повторите запрос удаления. Данная проблема будет исправлена в будущем вместе с обновлением повторите запрос удаления. Исправление этой проблемы уже реализовано в ветке "epoch-deletions"
дискового формата хранения метаданных. и вскоре будет включено в релиз.

View File

@ -784,17 +784,11 @@ void nfs_client_t::stop()
stopped = true; stopped = true;
if (refs <= 0) if (refs <= 0)
{ {
#ifdef WITH_RDMACM
destroy_rdma_conn();
#endif
auto parent = this->parent; auto parent = this->parent;
parent->rpc_clients.erase(this); parent->rpc_clients.erase(this);
parent->active_connections--; parent->active_connections--;
if (nfs_fd >= 0) parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
{ close(nfs_fd);
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
close(nfs_fd);
}
delete this; delete this;
parent->check_exit(); parent->check_exit();
} }

View File

@ -117,7 +117,7 @@ public:
nfs_rdma_conn_t *rdma_conn = NULL; nfs_rdma_conn_t *rdma_conn = NULL;
// <TCP> // <TCP>
int nfs_fd = -1; int nfs_fd;
int epoll_events = 0; int epoll_events = 0;
// Read state // Read state
@ -149,5 +149,4 @@ public:
void *rdma_malloc(size_t size); void *rdma_malloc(size_t size);
void rdma_encode_header(rpc_op_t *rop); void rdma_encode_header(rpc_op_t *rop);
void rdma_queue_reply(rpc_op_t *rop); void rdma_queue_reply(rpc_op_t *rop);
void destroy_rdma_conn();
}; };

View File

@ -44,7 +44,7 @@ struct nfs_rdma_context_t
{ {
std::string bind_address; std::string bind_address;
int rdmacm_port = 0; int rdmacm_port = 0;
uint32_t max_iodepth = 16; int max_send = 8, max_recv = 8; // FIXME max_send and max_recv should probably be equal
uint64_t rdma_malloc_round_to = 1048576, rdma_max_unused_buffers = 500*1048576; uint64_t rdma_malloc_round_to = 1048576, rdma_max_unused_buffers = 500*1048576;
uint64_t max_send_size = 256*1024, max_recv_size = 256*1024; uint64_t max_send_size = 256*1024, max_recv_size = 256*1024;
@ -72,11 +72,12 @@ struct nfs_rdma_conn_t
nfs_rdma_context_t *ctx = NULL; nfs_rdma_context_t *ctx = NULL;
nfs_client_t *client = NULL; nfs_client_t *client = NULL;
rdma_cm_id *id = NULL; rdma_cm_id *id = NULL;
int max_send = 8, max_recv = 8; // FIXME set it
int max_send_size = 256*1024, max_recv_size = 256*1024; int max_send_size = 256*1024, max_recv_size = 256*1024;
int remote_max_send_size = 1024, remote_max_recv_size = 1024; int remote_max_send_size = 1024, remote_max_recv_size = 1024; // FIXME is it used?
bool remote_invalidate = false; bool remote_invalidate = false;
bool established = false; bool established = false;
uint32_t cur_credit = 16; int cur_recv = 0, cur_send = 0; // FIXME use it ...
std::vector<nfs_rdma_buf_t> recv_buffers; std::vector<nfs_rdma_buf_t> recv_buffers;
std::map<void*, nfs_rdma_buf_t> used_buffers; std::map<void*, nfs_rdma_buf_t> used_buffers;
int next_recv_buf = 0; int next_recv_buf = 0;
@ -91,7 +92,6 @@ struct nfs_rdma_conn_t
void post_recv(nfs_rdma_buf_t b); void post_recv(nfs_rdma_buf_t b);
void post_send(); void post_send();
bool handle_recv(void *buf, size_t len); bool handle_recv(void *buf, size_t len);
void free_rdma_rpc_op(rpc_op_t *rop);
}; };
nfs_rdma_context_t* nfs_proxy_t::create_rdma(const std::string & bind_address, int rdmacm_port) nfs_rdma_context_t* nfs_proxy_t::create_rdma(const std::string & bind_address, int rdmacm_port)
@ -240,7 +240,7 @@ void nfs_rdma_context_t::handle_rdmacm_events()
fprintf(stderr, "Received %s event for connection 0x%lx - closing it\n", fprintf(stderr, "Received %s event for connection 0x%lx - closing it\n",
event_type_name, (uint64_t)ev->id); event_type_name, (uint64_t)ev->id);
auto conn = conn_it->second; auto conn = conn_it->second;
conn->client->stop(); delete conn;
} }
} }
else if (ev->event == RDMA_CM_EVENT_ESTABLISHED) else if (ev->event == RDMA_CM_EVENT_ESTABLISHED)
@ -267,7 +267,7 @@ void nfs_rdma_context_t::handle_rdmacm_events()
void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev) void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
{ {
this->used_max_cqe += max_iodepth*2; this->used_max_cqe += max_send+max_recv;
if (this->used_max_cqe > this->max_cqe) if (this->used_max_cqe > this->max_cqe)
{ {
// Resize CQ // Resize CQ
@ -287,9 +287,8 @@ void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
.send_cq = this->cq, .send_cq = this->cq,
.recv_cq = this->cq, .recv_cq = this->cq,
.cap = { .cap = {
// each op at each moment takes 1 RDMA_RECV or 1 RDMA_READ or 1 RDMA_WRITE + 1 RDMA_SEND .max_send_wr = max_send*2, // FIXME how many max_send/max_recv?
.max_send_wr = max_iodepth*2, .max_recv_wr = max_recv,
.max_recv_wr = max_iodepth,
.max_send_sge = 1, // we don't need S/G currently .max_send_sge = 1, // we don't need S/G currently
.max_recv_sge = 1, .max_recv_sge = 1,
}, },
@ -305,8 +304,8 @@ void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
.format_identifier = NFS_RDMACM_PRIVATE_DATA_MAGIC_LE, .format_identifier = NFS_RDMACM_PRIVATE_DATA_MAGIC_LE,
.version = 1, .version = 1,
.remote_invalidate = 0, // FIXME what is remote_invalidate? .remote_invalidate = 0, // FIXME what is remote_invalidate?
.max_send_size = (uint8_t)(max_send_size <= 256*1024 ? max_send_size/1024 - 1 : 255), .max_send_size = (max_send_size <= 256*1024 ? max_send_size/1024 - 1 : 255),
.max_recv_size = (uint8_t)(max_recv_size <= 256*1024 ? max_recv_size/1024 - 1 : 255), .max_recv_size = (max_recv_size <= 256*1024 ? max_recv_size/1024 - 1 : 255),
}; };
rdma_conn_param conn_params = { rdma_conn_param conn_params = {
.private_data = &private_data, .private_data = &private_data,
@ -328,7 +327,6 @@ void nfs_rdma_context_t::rdmacm_accept(rdma_cm_event *ev)
auto conn = new nfs_rdma_conn_t(); auto conn = new nfs_rdma_conn_t();
conn->ctx = this; conn->ctx = this;
conn->id = ev->id; conn->id = ev->id;
conn->cur_credit = max_iodepth;
conn->max_send_size = max_send_size; conn->max_send_size = max_send_size;
conn->max_recv_size = max_recv_size; conn->max_recv_size = max_recv_size;
rdma_connections[ev->id] = conn; rdma_connections[ev->id] = conn;
@ -384,7 +382,7 @@ void nfs_rdma_context_t::rdmacm_established(rdma_cm_event *ev)
void nfs_rdma_conn_t::post_initial_receives() void nfs_rdma_conn_t::post_initial_receives()
{ {
for (int i = 0; i < cur_credit; i++) while (cur_recv < max_recv)
{ {
auto b = create_buf(max_recv_size); auto b = create_buf(max_recv_size);
recv_buffers.push_back(b); recv_buffers.push_back(b);
@ -408,10 +406,6 @@ nfs_rdma_buf_t nfs_rdma_conn_t::create_buf(size_t len)
void nfs_rdma_conn_t::post_recv(nfs_rdma_buf_t b) void nfs_rdma_conn_t::post_recv(nfs_rdma_buf_t b)
{ {
if (client->stopped)
{
return;
}
ibv_sge sge = { ibv_sge sge = {
.addr = (uintptr_t)b.buf, .addr = (uintptr_t)b.buf,
.length = (uint32_t)b.len, .length = (uint32_t)b.len,
@ -429,6 +423,7 @@ void nfs_rdma_conn_t::post_recv(nfs_rdma_buf_t b)
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err)); fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
exit(1); exit(1);
} }
cur_recv++;
} }
void nfs_client_t::rdma_encode_header(rpc_op_t *rop) void nfs_client_t::rdma_encode_header(rpc_op_t *rop)
@ -436,7 +431,7 @@ void nfs_client_t::rdma_encode_header(rpc_op_t *rop)
rdma_msg outrmsg = { rdma_msg outrmsg = {
.rdma_xid = rop->in_rdma_msg.rdma_xid, .rdma_xid = rop->in_rdma_msg.rdma_xid,
.rdma_vers = rop->in_rdma_msg.rdma_vers, .rdma_vers = rop->in_rdma_msg.rdma_vers,
.rdma_credit = rdma_conn->cur_credit, .rdma_credit = 1, // FIXME!!!11
.rdma_body = { .rdma_body = {
.proc = rop->rdma_error ? RDMA_ERROR : RDMA_MSG, .proc = rop->rdma_error ? RDMA_ERROR : RDMA_MSG,
}, },
@ -463,7 +458,6 @@ void nfs_client_t::rdma_queue_reply(rpc_op_t *rop)
void nfs_rdma_conn_t::post_send() void nfs_rdma_conn_t::post_send()
{ {
again:
while (outbox.size() > outbox_pos) while (outbox.size() > outbox_pos)
{ {
auto rop = outbox[outbox_pos]; auto rop = outbox[outbox_pos];
@ -498,15 +492,7 @@ again:
size_t pos = 0; size_t pos = 0;
for (unsigned i = 0; i < iov_count; i++) for (unsigned i = 0; i < iov_count; i++)
{ {
if (pos + iov_list[i].iov_len > b.len) assert(pos + iov_list[i].iov_len <= b.len);
{
// Message exceeds buffer - stop the client
auto cli = ((nfs_client_t*)rop->client);
cli->stop();
outbox.erase(outbox.begin()+outbox_pos, outbox.begin()+outbox_pos+1);
cli->rdma_conn->free_rdma_rpc_op(rop);
goto again;
}
memcpy(b.buf + pos, iov_list[i].iov_base, iov_list[i].iov_len); memcpy(b.buf + pos, iov_list[i].iov_base, iov_list[i].iov_len);
pos += iov_list[i].iov_len; pos += iov_list[i].iov_len;
} }
@ -556,7 +542,7 @@ again:
fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
exit(1); exit(1);
} }
outbox_pos++; cur_send++;
} }
} }
@ -593,27 +579,15 @@ void nfs_rdma_context_t::handle_io()
if (wc[i].status != IBV_WC_SUCCESS) if (wc[i].status != IBV_WC_SUCCESS)
{ {
fprintf(stderr, "RDMA work request failed for queue %d with status: %s, stopping client\n", wc[i].qp_num, ibv_wc_status_str(wc[i].status)); fprintf(stderr, "RDMA work request failed for queue %d with status: %s, stopping client\n", wc[i].qp_num, ibv_wc_status_str(wc[i].status));
conn->client->stop(); delete conn;
// but continue to handle events to purge the queue continue;
} }
if (wc[i].wr_id == 1) if (wc[i].wr_id == 1)
{ {
// 1 = receive // 1 = receive
conn->cur_recv--;
auto & b = conn->recv_buffers[conn->next_recv_buf]; auto & b = conn->recv_buffers[conn->next_recv_buf];
bool is_continued = false; auto is_continued = conn->handle_recv(b.buf, wc[i].byte_len);
if (conn->cur_credit > 0 && !conn->client->stopped)
{
// Increase client refcount while the RPC call is being processed
conn->client->refs++;
conn->cur_credit--;
is_continued = conn->handle_recv(b.buf, wc[i].byte_len);
}
else
{
fprintf(stderr, "Warning: NFS client credit exceeded for queue %d, stopping client\n", wc[i].qp_num);
conn->client->stop();
continue;
}
if (is_continued) if (is_continued)
{ {
// Buffer is required to handle request // Buffer is required to handle request
@ -633,9 +607,24 @@ void nfs_rdma_context_t::handle_io()
// 2 = send // 2 = send
auto rop = conn->outbox[0]; auto rop = conn->outbox[0];
conn->outbox.erase(conn->outbox.begin(), conn->outbox.begin()+1); conn->outbox.erase(conn->outbox.begin(), conn->outbox.begin()+1);
conn->outbox_pos--;
// Free rpc_op // Free rpc_op
conn->free_rdma_rpc_op(rop); xdr_reset(rop->xdrs);
proxy->xdr_pool.push_back(rop->xdrs);
if (rop->buffer && rop->referenced)
{
// Reuse the buffer
auto & ub = conn->used_buffers.at(rop->buffer);
conn->recv_buffers.push_back(ub);
conn->post_recv(ub);
}
auto rdma_chunk = xdr_get_rdma_chunk(rop->xdrs);
if (rdma_chunk)
{
rdma_malloc_free(alloc, rdma_chunk);
xdr_set_rdma_chunk(rop->xdrs, NULL);
}
free(rop);
conn->post_send();
} }
else if (wc[i].wr_id == 3) else if (wc[i].wr_id == 3)
{ {
@ -655,28 +644,6 @@ void nfs_rdma_context_t::handle_io()
} while (event_count > 0); } while (event_count > 0);
} }
void nfs_rdma_conn_t::free_rdma_rpc_op(rpc_op_t *rop)
{
xdr_reset(rop->xdrs);
ctx->proxy->xdr_pool.push_back(rop->xdrs);
if (rop->buffer && rop->referenced)
{
// Reuse the buffer
auto & ub = used_buffers.at(rop->buffer);
recv_buffers.push_back(ub);
post_recv(ub);
}
auto rdma_chunk = xdr_get_rdma_chunk(rop->xdrs);
if (rdma_chunk)
{
rdma_malloc_free(ctx->alloc, rdma_chunk);
xdr_set_rdma_chunk(rop->xdrs, NULL);
}
free(rop);
cur_credit++;
client->deref();
}
// returns false if handling is done, returns true if handling is continued asynchronously // returns false if handling is done, returns true if handling is continued asynchronously
bool nfs_rdma_conn_t::handle_recv(void *buf, size_t len) bool nfs_rdma_conn_t::handle_recv(void *buf, size_t len)
{ {
@ -783,6 +750,7 @@ bool nfs_rdma_conn_t::handle_recv(void *buf, size_t len)
fprintf(stderr, "RDMA receive failed: %s\n", strerror(err)); fprintf(stderr, "RDMA receive failed: %s\n", strerror(err));
exit(1); exit(1);
} }
cur_send++;
xdr_set_rdma_chunk(rop->xdrs, buf); xdr_set_rdma_chunk(rop->xdrs, buf);
rop->referenced = 1; rop->referenced = 1;
chunk_inbox.push_back(rop); chunk_inbox.push_back(rop);
@ -796,13 +764,4 @@ void *nfs_client_t::rdma_malloc(size_t size)
return rdma_malloc_alloc(rdma_conn->ctx->alloc, size); return rdma_malloc_alloc(rdma_conn->ctx->alloc, size);
} }
void nfs_client_t::destroy_rdma_conn()
{
if (rdma_conn)
{
delete rdma_conn;
rdma_conn = NULL;
}
}
#endif #endif