Compare commits

...

13 Commits

26 changed files with 181 additions and 151 deletions

View File

@@ -48,9 +48,9 @@ Vitastor, составлены для того, чтобы убедиться,
интерфейс (прокси), опять же, без открытия в свободный публичный доступ как интерфейс (прокси), опять же, без открытия в свободный публичный доступ как
самой программы, так и прокси. самой программы, так и прокси.
Сетевая Публичная Лицензия Vitastor разработана специально чтобы Сетевая Публичная Лицензия Vitastor разработана специально, чтобы
гарантировать, что в таких случаях и модифицированная версия программы, и гарантировать, что в таких случаях и модифицированная версия программы, и
прокси оставались доступными сообществу. Для этого лицензия требует от прокси останутся доступными сообществу. Для этого лицензия требует от
операторов сетевых серверов предоставлять исходный код оригинальной программы, операторов сетевых серверов предоставлять исходный код оригинальной программы,
а также всех других программ, взаимодействующих с ней на их серверах, а также всех других программ, взаимодействующих с ней на их серверах,
пользователям этих серверов, на условиях свободных лицензий. Таким образом, пользователям этих серверов, на условиях свободных лицензий. Таким образом,

2
json11

Submodule json11 updated: 52a3af664f...fd37016cf8

View File

@@ -70,9 +70,9 @@ const etcd_tree = {
rdma_gid_index: 0, rdma_gid_index: 0,
rdma_mtu: 4096, rdma_mtu: 4096,
rdma_max_sge: 128, rdma_max_sge: 128,
rdma_max_send: 32, rdma_max_send: 64,
rdma_max_recv: 8, rdma_max_recv: 128,
rdma_max_msg: 1048576, rdma_max_msg: 132096,
log_level: 0, log_level: 0,
block_size: 131072, block_size: 131072,
disk_alignment: 4096, disk_alignment: 4096,
@@ -261,7 +261,7 @@ const etcd_tree = {
/* <pool_id>: { /* <pool_id>: {
<pg_id>: { <pg_id>: {
primary: osd_num_t, primary: osd_num_t,
state: ("starting"|"peering"|"peered"|"incomplete"|"active"|"repeering"|"stopping"|"offline"| state: ("starting"|"peering"|"incomplete"|"active"|"repeering"|"stopping"|"offline"|
"degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"| "degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|
"has_invalid"|"left_on_dead")[], "has_invalid"|"left_on_dead")[],
} }

View File

@@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8)
project(vitastor) project(vitastor)
include(GNUInstallDirs) include(GNUInstallDirs)
include(CTest)
set(WITH_QEMU false CACHE BOOL "Build QEMU driver inside Vitastor source tree") set(WITH_QEMU false CACHE BOOL "Build QEMU driver inside Vitastor source tree")
set(WITH_FIO true CACHE BOOL "Build FIO driver") set(WITH_FIO true CACHE BOOL "Build FIO driver")
@@ -55,6 +56,14 @@ if (ISAL_LIBRARIES)
add_definitions(-DWITH_ISAL) add_definitions(-DWITH_ISAL)
endif (ISAL_LIBRARIES) endif (ISAL_LIBRARIES)
add_custom_target(build_tests)
add_custom_target(test
COMMAND
echo leak:tcmalloc > ${CMAKE_CURRENT_BINARY_DIR}/lsan-suppress.txt &&
env LSAN_OPTIONS=suppressions=${CMAKE_CURRENT_BINARY_DIR}/lsan-suppress.txt ${CMAKE_CTEST_COMMAND}
)
add_dependencies(test build_tests)
include_directories( include_directories(
../ ../
/usr/include/jerasure /usr/include/jerasure
@@ -234,14 +243,17 @@ add_executable(osd_test osd_test.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(osd_test tcmalloc_minimal) target_link_libraries(osd_test tcmalloc_minimal)
# osd_rmw_test # osd_rmw_test
# FIXME: Move to tests add_executable(osd_rmw_test EXCLUDE_FROM_ALL osd_rmw_test.cpp allocator.cpp)
add_executable(osd_rmw_test osd_rmw_test.cpp allocator.cpp)
target_link_libraries(osd_rmw_test Jerasure ${ISAL_LIBRARIES} tcmalloc_minimal) target_link_libraries(osd_rmw_test Jerasure ${ISAL_LIBRARIES} tcmalloc_minimal)
add_dependencies(build_tests osd_rmw_test)
add_test(NAME osd_rmw_test COMMAND osd_rmw_test)
if (ISAL_LIBRARIES) if (ISAL_LIBRARIES)
add_executable(osd_rmw_test_je osd_rmw_test.cpp allocator.cpp) add_executable(osd_rmw_test_je EXCLUDE_FROM_ALL osd_rmw_test.cpp allocator.cpp)
target_compile_definitions(osd_rmw_test_je PUBLIC -DNO_ISAL) target_compile_definitions(osd_rmw_test_je PUBLIC -DNO_ISAL)
target_link_libraries(osd_rmw_test_je Jerasure tcmalloc_minimal) target_link_libraries(osd_rmw_test_je Jerasure tcmalloc_minimal)
add_dependencies(build_tests osd_rmw_test_je)
add_test(NAME osd_rmw_test_jerasure COMMAND osd_rmw_test_je)
endif (ISAL_LIBRARIES) endif (ISAL_LIBRARIES)
# stub_uring_osd # stub_uring_osd
@@ -256,11 +268,15 @@ target_link_libraries(stub_uring_osd
) )
# osd_peering_pg_test # osd_peering_pg_test
add_executable(osd_peering_pg_test osd_peering_pg_test.cpp osd_peering_pg.cpp) add_executable(osd_peering_pg_test EXCLUDE_FROM_ALL osd_peering_pg_test.cpp osd_peering_pg.cpp)
target_link_libraries(osd_peering_pg_test tcmalloc_minimal) target_link_libraries(osd_peering_pg_test tcmalloc_minimal)
add_dependencies(build_tests osd_peering_pg_test)
add_test(NAME osd_peering_pg_test COMMAND osd_peering_pg_test)
# test_allocator # test_allocator
add_executable(test_allocator test_allocator.cpp allocator.cpp) add_executable(test_allocator EXCLUDE_FROM_ALL test_allocator.cpp allocator.cpp)
add_dependencies(build_tests test_allocator)
add_test(NAME test_allocator COMMAND test_allocator)
# test_cas # test_cas
add_executable(test_cas add_executable(test_cas
@@ -280,12 +296,15 @@ target_link_libraries(test_crc32
# test_cluster_client # test_cluster_client
add_executable(test_cluster_client add_executable(test_cluster_client
EXCLUDE_FROM_ALL
test_cluster_client.cpp test_cluster_client.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp
) )
target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__) target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__)
target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mock) target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mock)
add_dependencies(build_tests test_cluster_client)
add_test(NAME test_cluster_client COMMAND test_cluster_client)
## test_blockstore, test_shit ## test_blockstore, test_shit
#add_executable(test_blockstore test_blockstore.cpp) #add_executable(test_blockstore test_blockstore.cpp)

View File

@@ -325,7 +325,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
{ {
// Basic verification not passed // Basic verification not passed
op->retval = -EINVAL; op->retval = -EINVAL;
std::function<void (blockstore_op_t*)>(op->callback)(op); ringloop->set_immediate([op]() { std::function<void (blockstore_op_t*)>(op->callback)(op); });
return; return;
} }
if (op->opcode == BS_OP_SYNC_STAB_ALL) if (op->opcode == BS_OP_SYNC_STAB_ALL)
@@ -368,7 +368,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
} }
if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op)) if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op))
{ {
std::function<void (blockstore_op_t*)>(op->callback)(op); ringloop->set_immediate([op]() { std::function<void (blockstore_op_t*)>(op->callback)(op); });
return; return;
} }
// Call constructor without allocating memory. We'll call destructor before returning op back // Call constructor without allocating memory. We'll call destructor before returning op back

View File

@@ -121,8 +121,7 @@ resume_1:
} }
if (pool_cfg.scheme != POOL_SCHEME_REPLICATED) if (pool_cfg.scheme != POOL_SCHEME_REPLICATED)
{ {
uint64_t pg_real_size = pool_stats[pool_cfg.id]["pg_real_size"].uint64_value(); pool_avail *= (pool_cfg.pg_size - pool_cfg.parity_chunks);
pool_avail = pg_real_size > 0 ? pool_avail * (pool_cfg.pg_size - pool_cfg.parity_chunks) / pg_real_size : 0;
} }
pool_stats[pool_cfg.id] = json11::Json::object { pool_stats[pool_cfg.id] = json11::Json::object {
{ "name", pool_cfg.name }, { "name", pool_cfg.name },

View File

@@ -92,6 +92,7 @@ struct rm_inode_t
void send_ops(rm_pg_t *cur_list) void send_ops(rm_pg_t *cur_list)
{ {
parent->cli->init_msgr();
if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) == if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) ==
parent->cli->msgr.osd_peer_fds.end()) parent->cli->msgr.osd_peer_fds.end())
{ {

View File

@@ -59,7 +59,6 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
delete op; delete op;
}; };
msgr.parse_config(this->config); msgr.parse_config(this->config);
msgr.init();
st_cli.tfd = tfd; st_cli.tfd = tfd;
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
@@ -73,17 +72,6 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
scrap_buffer_size = SCRAP_BUFFER_SIZE; scrap_buffer_size = SCRAP_BUFFER_SIZE;
scrap_buffer = malloc_or_die(scrap_buffer_size); scrap_buffer = malloc_or_die(scrap_buffer_size);
if (ringloop)
{
consumer.loop = [this]()
{
msgr.read_requests();
msgr.send_replies();
this->ringloop->submit();
};
ringloop->register_consumer(&consumer);
}
} }
cluster_client_t::~cluster_client_t() cluster_client_t::~cluster_client_t()
@@ -115,6 +103,24 @@ cluster_op_t::~cluster_op_t()
} }
} }
void cluster_client_t::init_msgr()
{
if (msgr_initialized)
return;
msgr.init();
msgr_initialized = true;
if (ringloop)
{
consumer.loop = [this]()
{
msgr.read_requests();
msgr.send_replies();
this->ringloop->submit();
};
ringloop->register_consumer(&consumer);
}
}
void cluster_client_t::calc_wait(cluster_op_t *op) void cluster_client_t::calc_wait(cluster_op_t *op)
{ {
op->prev_wait = 0; op->prev_wait = 0;
@@ -223,11 +229,14 @@ void cluster_client_t::erase_op(cluster_op_t *op)
if (op_queue_tail == op) if (op_queue_tail == op)
op_queue_tail = op->prev; op_queue_tail = op->prev;
op->next = op->prev = NULL; op->next = op->prev = NULL;
if (flags & OP_FLUSH_BUFFER)
std::function<void(cluster_op_t*)>(op->callback)(op);
if (!(flags & OP_IMMEDIATE_COMMIT)) if (!(flags & OP_IMMEDIATE_COMMIT))
inc_wait(opcode, flags, next, -1); inc_wait(opcode, flags, next, -1);
// Call callback at the end to avoid inconsistencies in prev_wait // Call callback at the end to avoid inconsistencies in prev_wait
// if the callback adds more operations itself // if the callback adds more operations itself
std::function<void(cluster_op_t*)>(op->callback)(op); if (!(flags & OP_FLUSH_BUFFER))
std::function<void(cluster_op_t*)>(op->callback)(op);
} }
void cluster_client_t::continue_ops(bool up_retry) void cluster_client_t::continue_ops(bool up_retry)
@@ -918,6 +927,10 @@ bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len
bool cluster_client_t::try_send(cluster_op_t *op, int i) bool cluster_client_t::try_send(cluster_op_t *op, int i)
{ {
if (!msgr_initialized)
{
init_msgr();
}
auto part = &op->parts[i]; auto part = &op->parts[i];
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode)); auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode));
auto pg_it = pool_cfg.pg_config.find(part->pg_num); auto pg_it = pool_cfg.pg_config.find(part->pg_num);

View File

@@ -104,10 +104,14 @@ class cluster_client_t
std::vector<std::function<void(void)>> on_ready_hooks; std::vector<std::function<void(void)>> on_ready_hooks;
std::vector<inode_list_t*> lists; std::vector<inode_list_t*> lists;
int continuing_ops = 0; int continuing_ops = 0;
bool msgr_initialized = false;
public: public:
etcd_state_client_t st_cli; etcd_state_client_t st_cli;
osd_messenger_t msgr; osd_messenger_t msgr;
void init_msgr();
json11::Json config; json11::Json config;
json11::Json::object merged_config; json11::Json::object merged_config;

View File

@@ -305,10 +305,10 @@ int write_zero(int fd, uint64_t offset, uint64_t size)
json11::Json read_parttable(std::string dev) json11::Json read_parttable(std::string dev)
{ {
std::string part_dump; std::string part_dump;
int r = shell_exec({ "sfdisk", "--dump", dev, "--json" }, "", &part_dump, NULL); int r = shell_exec({ "sfdisk", "--json", dev }, "", &part_dump, NULL);
if (r == 255) if (r == 255)
{ {
fprintf(stderr, "Error running sfdisk --dump %s --json\n", dev.c_str()); fprintf(stderr, "Error running sfdisk --json %s\n", dev.c_str());
return json11::Json(false); return json11::Json(false);
} }
// Decode partition table // Decode partition table
@@ -319,7 +319,7 @@ json11::Json read_parttable(std::string dev)
pt = json11::Json::parse(part_dump, err); pt = json11::Json::parse(part_dump, err);
if (err != "") if (err != "")
{ {
fprintf(stderr, "sfdisk --dump %s --json returned bad JSON: %s\n", dev.c_str(), part_dump.c_str()); fprintf(stderr, "sfdisk --json %s returned bad JSON: %s\n", dev.c_str(), part_dump.c_str());
return json11::Json(false); return json11::Json(false);
} }
pt = pt["partitiontable"]; pt = pt["partitiontable"];

View File

@@ -157,7 +157,7 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->rdma_max_sge = 128; this->rdma_max_sge = 128;
this->rdma_max_send = config["rdma_max_send"].uint64_value(); this->rdma_max_send = config["rdma_max_send"].uint64_value();
if (!this->rdma_max_send) if (!this->rdma_max_send)
this->rdma_max_send = 1; this->rdma_max_send = 64;
this->rdma_max_recv = config["rdma_max_recv"].uint64_value(); this->rdma_max_recv = config["rdma_max_recv"].uint64_value();
if (!this->rdma_max_recv) if (!this->rdma_max_recv)
this->rdma_max_recv = 128; this->rdma_max_recv = 128;

View File

@@ -134,10 +134,12 @@ protected:
msgr_rdma_context_t *rdma_context = NULL; msgr_rdma_context_t *rdma_context = NULL;
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0; uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
uint64_t rdma_max_msg = 0; uint64_t rdma_max_msg = 0;
std::vector<rdma_hb_t> rdma_handle_buffers;
#endif #endif
std::vector<int> read_ready_clients; std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients; std::vector<int> write_ready_clients;
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
std::vector<std::function<void()>> set_immediate; std::vector<std::function<void()>> set_immediate;
public: public:

View File

@@ -368,9 +368,8 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
bool osd_messenger_t::try_send_rdma(osd_client_t *cl) bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
{ {
auto rc = cl->rdma_conn; auto rc = cl->rdma_conn;
if (!cl->send_list.size() || rc->cur_send > 0) if (!cl->send_list.size() || rc->cur_send >= rc->max_send)
{ {
// Only send one batch at a time
return true; return true;
} }
uint64_t op_size = 0, op_sge = 0; uint64_t op_size = 0, op_sge = 0;
@@ -380,6 +379,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
iovec & iov = cl->send_list[rc->send_pos]; iovec & iov = cl->send_list[rc->send_pos];
if (op_size >= rc->max_msg || op_sge >= rc->max_sge) if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
{ {
rc->send_sizes.push_back(op_size);
try_send_rdma_wr(cl, sge, op_sge); try_send_rdma_wr(cl, sge, op_sge);
op_sge = 0; op_sge = 0;
op_size = 0; op_size = 0;
@@ -405,18 +405,24 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
} }
if (op_sge > 0) if (op_sge > 0)
{ {
rc->send_sizes.push_back(op_size);
try_send_rdma_wr(cl, sge, op_sge); try_send_rdma_wr(cl, sge, op_sge);
} }
return true; return true;
} }
static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) static void try_recv_rdma_wr(osd_client_t *cl, void *buf)
{ {
ibv_sge sge = {
.addr = (uintptr_t)buf,
.length = (uint32_t)cl->rdma_conn->max_msg,
.lkey = cl->rdma_conn->ctx->mr->lkey,
};
ibv_recv_wr *bad_wr = NULL; ibv_recv_wr *bad_wr = NULL;
ibv_recv_wr wr = { ibv_recv_wr wr = {
.wr_id = (uint64_t)(cl->peer_fd*2), .wr_id = (uint64_t)(cl->peer_fd*2),
.sg_list = sge, .sg_list = &sge,
.num_sge = op_sge, .num_sge = 1,
}; };
int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr); int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr);
if (err || bad_wr) if (err || bad_wr)
@@ -434,12 +440,7 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
{ {
void *buf = malloc_or_die(rc->max_msg); void *buf = malloc_or_die(rc->max_msg);
rc->recv_buffers.push_back(buf); rc->recv_buffers.push_back(buf);
ibv_sge sge = { try_recv_rdma_wr(cl, buf);
.addr = (uintptr_t)buf,
.length = (uint32_t)rc->max_msg,
.lkey = rc->ctx->mr->lkey,
};
try_recv_rdma_wr(cl, &sge, 1);
} }
return true; return true;
} }
@@ -476,6 +477,7 @@ void osd_messenger_t::handle_rdma_events()
continue; continue;
} }
osd_client_t *cl = cl_it->second; osd_client_t *cl = cl_it->second;
auto rc = cl->rdma_conn;
if (wc[i].status != IBV_WC_SUCCESS) if (wc[i].status != IBV_WC_SUCCESS)
{ {
fprintf(stderr, "RDMA work request failed for client %d", client_id); fprintf(stderr, "RDMA work request failed for client %d", client_id);
@@ -489,47 +491,68 @@ void osd_messenger_t::handle_rdma_events()
} }
if (!is_send) if (!is_send)
{ {
cl->rdma_conn->cur_recv--; rc->cur_recv--;
if (!handle_read_buffer(cl, cl->rdma_conn->recv_buffers[0], wc[i].byte_len)) rdma_handle_buffers.push_back((rdma_hb_t){ .peer_fd = client_id, .buf = rc->recv_buffers[0], .len = wc[i].byte_len });
{ rc->recv_buffers.erase(rc->recv_buffers.begin(), rc->recv_buffers.begin()+1);
// handle_read_buffer may stop the client
continue;
}
free(cl->rdma_conn->recv_buffers[0]);
cl->rdma_conn->recv_buffers.erase(cl->rdma_conn->recv_buffers.begin(), cl->rdma_conn->recv_buffers.begin()+1);
try_recv_rdma(cl); try_recv_rdma(cl);
} }
else else
{ {
cl->rdma_conn->cur_send--; rc->cur_send--;
if (!cl->rdma_conn->cur_send) uint64_t sent_size = rc->send_sizes.at(0);
rc->send_sizes.erase(rc->send_sizes.begin(), rc->send_sizes.begin()+1);
int send_pos = 0, send_buf_pos = 0;
while (sent_size > 0)
{ {
// Wait for the whole batch if (sent_size >= cl->send_list.at(send_pos).iov_len)
for (int i = 0; i < cl->rdma_conn->send_pos; i++)
{ {
if (cl->outbox[i].flags & MSGR_SENDP_FREE) sent_size -= cl->send_list[send_pos].iov_len;
{ send_pos++;
// Reply fully sent
delete cl->outbox[i].op;
}
} }
if (cl->rdma_conn->send_pos > 0) else
{ {
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos); send_buf_pos = sent_size;
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos); sent_size = 0;
cl->rdma_conn->send_pos = 0;
} }
if (cl->rdma_conn->send_buf_pos > 0)
{
cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + cl->rdma_conn->send_buf_pos;
cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos;
cl->rdma_conn->send_buf_pos = 0;
}
try_send_rdma(cl);
} }
assert(rc->send_pos >= send_pos);
if (rc->send_pos == send_pos)
{
rc->send_buf_pos -= send_buf_pos;
}
rc->send_pos -= send_pos;
for (int i = 0; i < send_pos; i++)
{
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
{
// Reply fully sent
delete cl->outbox[i].op;
}
}
if (send_pos > 0)
{
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos);
}
if (send_buf_pos > 0)
{
cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + send_buf_pos;
cl->send_list[0].iov_len -= send_buf_pos;
}
try_send_rdma(cl);
} }
} }
} while (event_count > 0); } while (event_count > 0);
for (auto & hb: rdma_handle_buffers)
{
auto cl_it = clients.find(hb.peer_fd);
if (cl_it != clients.end())
{
handle_read_buffer(cl_it->second, hb.buf, hb.len);
}
free(hb.buf);
}
rdma_handle_buffers.clear();
for (auto cb: set_immediate) for (auto cb: set_immediate)
{ {
cb(); cb();

View File

@@ -49,10 +49,18 @@ struct msgr_rdma_connection_t
uint64_t max_msg = 0; uint64_t max_msg = 0;
int send_pos = 0, send_buf_pos = 0; int send_pos = 0, send_buf_pos = 0;
int recv_pos = 0, recv_buf_pos = 0; int next_recv_buf = 0;
std::vector<void*> recv_buffers; std::vector<void*> recv_buffers;
std::vector<uint64_t> send_sizes;
~msgr_rdma_connection_t(); ~msgr_rdma_connection_t();
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg); static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);
int connect(msgr_rdma_address_t *dest); int connect(msgr_rdma_address_t *dest);
}; };
struct rdma_hb_t
{
int peer_fd;
void *buf;
uint64_t len;
};

View File

@@ -683,7 +683,7 @@ void osd_t::apply_pg_config()
auto vec_all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()); auto vec_all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end());
if (currently_taken) if (currently_taken)
{ {
if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING | PG_REPEERING | PG_PEERED)) if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING | PG_REPEERING))
{ {
if (pg_it->second.target_set == pg_cfg.target_set && if (pg_it->second.target_set == pg_cfg.target_set &&
pg_it->second.target_history == pg_cfg.target_history && pg_it->second.target_history == pg_cfg.target_history &&
@@ -963,13 +963,6 @@ void osd_t::report_pg_states()
} }
this->pgs.erase(pg_it); this->pgs.erase(pg_it);
} }
else if (pg_it->second.state & PG_PEERED)
{
// Activate PG after PG PEERED state is reported along with history
// (if the state wasn't changed again)
pg_it->second.state = pg_it->second.state & ~PG_PEERED | PG_ACTIVE;
report_pg_state(pg_it->second);
}
} }
} }
// Push other PG state updates, if any // Push other PG state updates, if any

View File

@@ -50,10 +50,6 @@ void osd_t::handle_peers()
still = true; still = true;
} }
} }
else if (p.second.state & PG_PEERED)
{
still = true;
}
} }
if (!still) if (!still)
{ {
@@ -74,10 +70,6 @@ void osd_t::handle_peers()
} }
still = true; still = true;
} }
else if (p.second.state & PG_PEERED)
{
still = true;
}
} }
if (!still) if (!still)
{ {
@@ -100,7 +92,7 @@ void osd_t::repeer_pgs(osd_num_t peer_osd)
{ {
auto & pg = p.second; auto & pg = p.second;
bool repeer = false; bool repeer = false;
if (pg.state & (PG_PEERING | PG_PEERED | PG_ACTIVE | PG_INCOMPLETE)) if (pg.state & (PG_PEERING | PG_ACTIVE | PG_INCOMPLETE))
{ {
for (osd_num_t pg_osd: pg.all_peers) for (osd_num_t pg_osd: pg.all_peers)
{ {

View File

@@ -88,13 +88,9 @@ void pg_obj_state_check_t::walk()
{ {
// Activate as degraded // Activate as degraded
// Current OSD set will be added into target_history on first write // Current OSD set will be added into target_history on first write
pg->state |= PG_DEGRADED | PG_PEERED; pg->state |= PG_DEGRADED;
}
else
{
// Just activate
pg->state |= PG_ACTIVE;
} }
pg->state |= PG_ACTIVE;
if (pg->state == PG_ACTIVE && pg->cur_peers.size() < pg->all_peers.size()) if (pg->state == PG_ACTIVE && pg->cur_peers.size() < pg->all_peers.size())
{ {
pg->state |= PG_LEFT_ON_DEAD; pg->state |= PG_LEFT_ON_DEAD;
@@ -460,11 +456,10 @@ void pg_t::calc_object_states(int log_level)
void pg_t::print_state() void pg_t::print_state()
{ {
printf( printf(
"[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num, "[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
(state & PG_STARTING) ? "starting" : "", (state & PG_STARTING) ? "starting" : "",
(state & PG_OFFLINE) ? "offline" : "", (state & PG_OFFLINE) ? "offline" : "",
(state & PG_PEERING) ? "peering" : "", (state & PG_PEERING) ? "peering" : "",
(state & PG_PEERED) ? "peered" : "",
(state & PG_INCOMPLETE) ? "incomplete" : "", (state & PG_INCOMPLETE) ? "incomplete" : "",
(state & PG_ACTIVE) ? "active" : "", (state & PG_ACTIVE) ? "active" : "",
(state & PG_REPEERING) ? "repeering" : "", (state & PG_REPEERING) ? "repeering" : "",

View File

@@ -54,5 +54,6 @@ int main(int argc, char *argv[])
{ {
printf("dev: state=%lx\n", it.second.state); printf("dev: state=%lx\n", it.second.state);
} }
delete pg.peering_state;
return 0; return 0;
} }

View File

@@ -297,7 +297,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
// Fail it immediately // Fail it immediately
subop->peer_fd = -1; subop->peer_fd = -1;
subop->reply.hdr.retval = -EPIPE; subop->reply.hdr.retval = -EPIPE;
subop->callback(subop); ringloop->set_immediate([subop]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
} }
subop_idx++; subop_idx++;
} }

View File

@@ -235,7 +235,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
// Fail it immediately // Fail it immediately
subop->peer_fd = -1; subop->peer_fd = -1;
subop->reply.hdr.retval = -EPIPE; subop->reply.hdr.retval = -EPIPE;
subop->callback(subop); ringloop->set_immediate([subop]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
} }
} }
i++; i++;
@@ -520,7 +520,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
// Fail it immediately // Fail it immediately
subops[i].peer_fd = -1; subops[i].peer_fd = -1;
subops[i].reply.hdr.retval = -EPIPE; subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]); ringloop->set_immediate([subop = &subops[i]]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
} }
} }
} }
@@ -635,7 +635,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
// Fail it immediately // Fail it immediately
subops[i].peer_fd = -1; subops[i].peer_fd = -1;
subops[i].reply.hdr.retval = -EPIPE; subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]); ringloop->set_immediate([subop = &subops[i]]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
} }
} }
} }

View File

@@ -881,7 +881,7 @@ void test15()
// Done // Done
free(rmw_buf); free(rmw_buf);
free(write_buf); free(write_buf);
use_ec(3, 2, false); use_ec(4, 2, false);
} }
/*** /***
@@ -984,5 +984,5 @@ void test16()
// Done // Done
free(rmw_buf); free(rmw_buf);
free(write_buf); free(write_buf);
use_ec(3, 2, false); use_ec(4, 2, false);
} }

View File

@@ -3,12 +3,11 @@
#include "pg_states.h" #include "pg_states.h"
const int pg_state_bit_count = 16; const int pg_state_bit_count = 14;
const int pg_state_bits[16] = { const int pg_state_bits[14] = {
PG_STARTING, PG_STARTING,
PG_PEERING, PG_PEERING,
PG_PEERED,
PG_INCOMPLETE, PG_INCOMPLETE,
PG_ACTIVE, PG_ACTIVE,
PG_REPEERING, PG_REPEERING,
@@ -23,10 +22,9 @@ const int pg_state_bits[16] = {
PG_LEFT_ON_DEAD, PG_LEFT_ON_DEAD,
}; };
const char *pg_state_names[16] = { const char *pg_state_names[14] = {
"starting", "starting",
"peering", "peering",
"peered",
"incomplete", "incomplete",
"active", "active",
"repeering", "repeering",

View File

@@ -4,27 +4,25 @@
#pragma once #pragma once
// Placement group states // Placement group states
// STARTING -> [acquire lock] -> PEERING -> PEERED // STARTING -> [acquire lock] -> PEERING -> INCOMPLETE|ACTIVE
// PEERED -> [report history if required!] -> INCOMPLETE|ACTIVE
// ACTIVE -> REPEERING -> PEERING // ACTIVE -> REPEERING -> PEERING
// ACTIVE -> STOPPING -> OFFLINE -> [release lock] // ACTIVE -> STOPPING -> OFFLINE -> [release lock]
// Exactly one of these: // Exactly one of these:
#define PG_STARTING (1<<0) #define PG_STARTING (1<<0)
#define PG_PEERING (1<<1) #define PG_PEERING (1<<1)
#define PG_PEERED (1<<2) #define PG_INCOMPLETE (1<<2)
#define PG_INCOMPLETE (1<<3) #define PG_ACTIVE (1<<3)
#define PG_ACTIVE (1<<4) #define PG_REPEERING (1<<4)
#define PG_REPEERING (1<<5) #define PG_STOPPING (1<<5)
#define PG_STOPPING (1<<6) #define PG_OFFLINE (1<<6)
#define PG_OFFLINE (1<<7)
// Plus any of these: // Plus any of these:
#define PG_DEGRADED (1<<8) #define PG_DEGRADED (1<<7)
#define PG_HAS_INCOMPLETE (1<<9) #define PG_HAS_INCOMPLETE (1<<8)
#define PG_HAS_DEGRADED (1<<10) #define PG_HAS_DEGRADED (1<<9)
#define PG_HAS_MISPLACED (1<<11) #define PG_HAS_MISPLACED (1<<10)
#define PG_HAS_UNCLEAN (1<<12) #define PG_HAS_UNCLEAN (1<<11)
#define PG_HAS_INVALID (1<<13) #define PG_HAS_INVALID (1<<12)
#define PG_LEFT_ON_DEAD (1<<14) #define PG_LEFT_ON_DEAD (1<<13)
// Lower bits that represent object role (EC 0/1/2... or always 0 with replication) // Lower bits that represent object role (EC 0/1/2... or always 0 with replication)
// 12 bits is a safe default that doesn't depend on pg_stripe_size or pg_block_size // 12 bits is a safe default that doesn't depend on pg_stripe_size or pg_block_size

View File

@@ -25,7 +25,6 @@ ring_loop_t::ring_loop_t(int qd)
{ {
free_ring_data[i] = i; free_ring_data[i] = i;
} }
wait_sqe_id = 1;
} }
ring_loop_t::~ring_loop_t() ring_loop_t::~ring_loop_t()
@@ -83,17 +82,19 @@ void ring_loop_t::loop()
} }
io_uring_cqe_seen(&ring, cqe); io_uring_cqe_seen(&ring, cqe);
} }
while (get_sqe_queue.size() > 0)
{
(get_sqe_queue[0].second)();
get_sqe_queue.erase(get_sqe_queue.begin());
}
do do
{ {
loop_again = false; loop_again = false;
for (int i = 0; i < consumers.size(); i++) for (int i = 0; i < consumers.size(); i++)
{ {
consumers[i]->loop(); consumers[i]->loop();
if (immediate_queue.size())
{
immediate_queue2.swap(immediate_queue);
for (auto & cb: immediate_queue2)
cb();
immediate_queue2.clear();
}
} }
} while (loop_again); } while (loop_again);
} }

View File

@@ -119,11 +119,10 @@ struct ring_consumer_t
class ring_loop_t class ring_loop_t
{ {
std::vector<std::pair<int,std::function<void()>>> get_sqe_queue; std::vector<std::function<void()>> immediate_queue, immediate_queue2;
std::vector<ring_consumer_t*> consumers; std::vector<ring_consumer_t*> consumers;
struct ring_data_t *ring_datas; struct ring_data_t *ring_datas;
int *free_ring_data; int *free_ring_data;
int wait_sqe_id;
unsigned free_ring_data_ptr; unsigned free_ring_data_ptr;
bool loop_again; bool loop_again;
struct io_uring ring; struct io_uring ring;
@@ -145,20 +144,9 @@ public:
} }
return sqe; return sqe;
} }
inline int wait_sqe(std::function<void()> cb) inline void set_immediate(const std::function<void()> cb)
{ {
get_sqe_queue.push_back({ wait_sqe_id, cb }); immediate_queue.push_back(cb);
return wait_sqe_id++;
}
inline void cancel_wait_sqe(int wait_id)
{
for (int i = 0; i < get_sqe_queue.size(); i++)
{
if (get_sqe_queue[i].first == wait_id)
{
get_sqe_queue.erase(get_sqe_queue.begin()+i, get_sqe_queue.begin()+i+1);
}
}
} }
inline int submit() inline int submit()
{ {

View File

@@ -8,7 +8,6 @@
void configure_single_pg_pool(cluster_client_t *cli) void configure_single_pg_pool(cluster_client_t *cli)
{ {
cli->st_cli.on_load_pgs_hook(true);
cli->st_cli.parse_state((etcd_kv_t){ cli->st_cli.parse_state((etcd_kv_t){
.key = "/config/pools", .key = "/config/pools",
.value = json11::Json::object { .value = json11::Json::object {
@@ -43,6 +42,7 @@ void configure_single_pg_pool(cluster_client_t *cli)
{ "state", json11::Json::array { "active" } }, { "state", json11::Json::array { "active" } },
}, },
}); });
cli->st_cli.on_load_pgs_hook(true);
std::map<std::string, etcd_kv_t> changes; std::map<std::string, etcd_kv_t> changes;
cli->st_cli.on_change_hook(changes); cli->st_cli.on_change_hook(changes);
} }
@@ -188,7 +188,6 @@ void test1()
int *r1 = test_write(cli, 0, 4096, 0x55); int *r1 = test_write(cli, 0, 4096, 0x55);
configure_single_pg_pool(cli); configure_single_pg_pool(cli);
pretend_connected(cli, 1); pretend_connected(cli, 1);
cli->continue_ops(true);
can_complete(r1); can_complete(r1);
check_op_count(cli, 1, 1); check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
@@ -196,8 +195,6 @@ void test1()
pretend_disconnected(cli, 1); pretend_disconnected(cli, 1);
int *r2 = test_sync(cli); int *r2 = test_sync(cli);
pretend_connected(cli, 1); pretend_connected(cli, 1);
check_op_count(cli, 1, 0);
cli->continue_ops(true);
check_op_count(cli, 1, 1); check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
check_op_count(cli, 1, 1); check_op_count(cli, 1, 1);
@@ -303,8 +300,6 @@ void test1()
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), -EPIPE); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), -EPIPE);
check_disconnected(cli, 1); check_disconnected(cli, 1);
pretend_connected(cli, 1); pretend_connected(cli, 1);
check_op_count(cli, 1, 0);
cli->continue_ops(true);
check_op_count(cli, 1, 1); check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0); pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_op_count(cli, 1, 1); check_op_count(cli, 1, 1);