Compare commits

..

19 Commits

Author SHA1 Message Date
b4b6407716 WIP Implement RDMA v2 based on IBV_WR_RDMA_WRITE with remote buffer management
One BIG FIXME remaining - handling large operations :))
2023-02-26 00:26:39 +03:00
8139a34e97 Fix json11: allow trailing comma 2023-02-23 01:16:01 +03:00
4ab630b44d Use just sfdisk --json, --dump is not needed 2023-02-23 00:55:47 +03:00
2c8241b7db Remove PG "peered" state 2023-02-21 01:30:42 +03:00
36a7dd3671 Move tests to "make test" 2023-02-21 01:30:42 +03:00
936122bbcf Initialize msgr lazily in client to speedup vitastor-cli with RDMA enabled 2023-02-19 18:59:07 +03:00
1a1ba0d1e7 Add set_immediate to ringloop and use it for bs/osd ops to prevent reenterability issues 2023-02-09 17:37:26 +03:00
3d09c9cec7 Remove unused wait_sqe() from ringloop 2023-02-09 17:37:26 +03:00
3d08a1ad6c Fix cluster_client test after last reenterability fixes 2023-02-05 01:47:32 +03:00
499881d81c Fix typo 2023-01-27 01:52:02 +03:00
aba93b951b Fix incorrect EC free space statistics in vitastor-cli df output 2023-01-26 02:04:29 +03:00
d125fb1f30 Release 0.8.5
- Fix a possible "double free" bug in the client library happening on OSD restart
- Fix a possible write hang on PG history update when only epoch is changed
- Fix incorrect systemd target "local.target" in mon/make-etcd
- Allow "content" option in PVE storage plugin to allow to enable containers
- Build client library without tcmalloc which fixes "attempt to free invalid pointer"
  errors when, for example, trying to run QEMU with both Vitastor and Ceph RBD disks
2023-01-25 01:43:49 +03:00
9d3fd72298 Require liburing < 2 in rpm specs 2023-01-25 01:43:49 +03:00
8b552a01f9 Do not retry successful operation parts in client (could lead to "double free" bugs) 2023-01-25 01:30:36 +03:00
0385b2f9e8 Fix write hangs on PG epoch update - always set pg.history_changed to true 2023-01-25 01:30:15 +03:00
749c837045 Replace non-existing local.target with multi-user.target 2023-01-25 01:29:31 +03:00
98001d845b Remove version from vitastor-release.rpm links 2023-01-23 14:03:33 +03:00
c96bcae74b Allow "content" option in PVE storage plugin to allow to enable containers 2023-01-16 18:14:45 +03:00
9f4e34a8cc Build client library without tcmalloc
Fixes "[src/tcmalloc.cc:332] Attempt to free invalid pointer ..." when trying
to run QEMU with both Vitastor and Ceph RBD disks and other possible allocator
collisions.
2023-01-15 00:01:11 +03:00
52 changed files with 689 additions and 275 deletions

View File

@@ -2,6 +2,6 @@ cmake_minimum_required(VERSION 2.8)
project(vitastor)
set(VERSION "0.8.4")
set(VERSION "0.8.5")
add_subdirectory(src)

View File

@@ -48,9 +48,9 @@ Vitastor, составлены для того, чтобы убедиться,
интерфейс (прокси), опять же, без открытия в свободный публичный доступ как
самой программы, так и прокси.
Сетевая Публичная Лицензия Vitastor разработана специально чтобы
Сетевая Публичная Лицензия Vitastor разработана специально, чтобы
гарантировать, что в таких случаях и модифицированная версия программы, и
прокси оставались доступными сообществу. Для этого лицензия требует от
прокси останутся доступными сообществу. Для этого лицензия требует от
операторов сетевых серверов предоставлять исходный код оригинальной программы,
а также всех других программ, взаимодействующих с ней на их серверах,
пользователям этих серверов, на условиях свободных лицензий. Таким образом,

View File

@@ -1,4 +1,4 @@
VERSION ?= v0.8.4
VERSION ?= v0.8.5
all: build push

View File

@@ -49,7 +49,7 @@ spec:
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: vitalif/vitastor-csi:v0.8.4
image: vitalif/vitastor-csi:v0.8.5
args:
- "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -116,7 +116,7 @@ spec:
privileged: true
capabilities:
add: ["SYS_ADMIN"]
image: vitalif/vitastor-csi:v0.8.4
image: vitalif/vitastor-csi:v0.8.5
args:
- "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -5,7 +5,7 @@ package vitastor
const (
vitastorCSIDriverName = "csi.vitastor.io"
vitastorCSIDriverVersion = "0.8.4"
vitastorCSIDriverVersion = "0.8.5"
)
// Config struct fills the parameters of request or user input

4
debian/changelog vendored
View File

@@ -1,10 +1,10 @@
vitastor (0.8.4-1) unstable; urgency=medium
vitastor (0.8.5-1) unstable; urgency=medium
* Bugfixes
-- Vitaliy Filippov <vitalif@yourcmc.ru> Fri, 03 Jun 2022 02:09:44 +0300
vitastor (0.8.4-1) unstable; urgency=medium
vitastor (0.8.5-1) unstable; urgency=medium
* Implement NFS proxy
* Add documentation

View File

@@ -34,8 +34,8 @@ RUN set -e -x; \
mkdir -p /root/packages/vitastor-$REL; \
rm -rf /root/packages/vitastor-$REL/*; \
cd /root/packages/vitastor-$REL; \
cp -r /root/vitastor vitastor-0.8.4; \
cd vitastor-0.8.4; \
cp -r /root/vitastor vitastor-0.8.5; \
cd vitastor-0.8.5; \
ln -s /root/fio-build/fio-*/ ./fio; \
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
ls /usr/include/linux/raw.h || cp ./debian/raw.h /usr/include/linux/raw.h; \
@@ -48,8 +48,8 @@ RUN set -e -x; \
rm -rf a b; \
echo "dep:fio=$FIO" > debian/fio_version; \
cd /root/packages/vitastor-$REL; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.8.4.orig.tar.xz vitastor-0.8.4; \
cd vitastor-0.8.4; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.8.5.orig.tar.xz vitastor-0.8.5; \
cd vitastor-0.8.5; \
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \

View File

@@ -20,8 +20,8 @@
## CentOS
- Add Vitastor package repository:
- CentOS 7: `yum install https://vitastor.io/rpms/centos/7/vitastor-release-1.0-1.el7.noarch.rpm`
- CentOS 8: `dnf install https://vitastor.io/rpms/centos/8/vitastor-release-1.0-1.el8.noarch.rpm`
- CentOS 7: `yum install https://vitastor.io/rpms/centos/7/vitastor-release.rpm`
- CentOS 8: `dnf install https://vitastor.io/rpms/centos/8/vitastor-release.rpm`
- Enable EPEL: `yum/dnf install epel-release`
- Enable additional CentOS repositories:
- CentOS 7: `yum install centos-release-scl`

View File

@@ -20,8 +20,8 @@
## CentOS
- Добавьте в систему репозиторий Vitastor:
- CentOS 7: `yum install https://vitastor.io/rpms/centos/7/vitastor-release-1.0-1.el7.noarch.rpm`
- CentOS 8: `dnf install https://vitastor.io/rpms/centos/8/vitastor-release-1.0-1.el8.noarch.rpm`
- CentOS 7: `yum install https://vitastor.io/rpms/centos/7/vitastor-release.rpm`
- CentOS 8: `dnf install https://vitastor.io/rpms/centos/8/vitastor-release.rpm`
- Включите EPEL: `yum/dnf install epel-release`
- Включите дополнительные репозитории CentOS:
- CentOS 7: `yum install centos-release-scl`

2
json11

Submodule json11 updated: 52a3af664f...fd37016cf8

View File

@@ -79,7 +79,7 @@ StartLimitInterval=0
RestartSec=10
[Install]
WantedBy=local.target
WantedBy=multi-user.target
`);
await system(`useradd etcd`);
await system(`systemctl daemon-reload`);

View File

@@ -261,7 +261,7 @@ const etcd_tree = {
/* <pool_id>: {
<pg_id>: {
primary: osd_num_t,
state: ("starting"|"peering"|"peered"|"incomplete"|"active"|"repeering"|"stopping"|"offline"|
state: ("starting"|"peering"|"incomplete"|"active"|"repeering"|"stopping"|"offline"|
"degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|
"has_invalid"|"left_on_dead")[],
}

View File

@@ -139,6 +139,7 @@ sub options
{
return {
shared => { optional => 1 },
content => { optional => 1 },
nodes => { optional => 1 },
disable => { optional => 1 },
vitastor_etcd_address => { optional => 1 },

View File

@@ -50,7 +50,7 @@ from cinder.volume import configuration
from cinder.volume import driver
from cinder.volume import volume_utils
VERSION = '0.8.4'
VERSION = '0.8.5'
LOG = logging.getLogger(__name__)

View File

@@ -25,4 +25,4 @@ rm fio
mv fio-copy fio
FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
tar --transform 's#^#vitastor-0.8.4/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.8.4$(rpm --eval '%dist').tar.gz *
tar --transform 's#^#vitastor-0.8.5/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.8.5$(rpm --eval '%dist').tar.gz *

View File

@@ -35,7 +35,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.8.4.el7.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.8.5.el7.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.8.4
Version: 0.8.5
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.8.4.el7.tar.gz
Source0: vitastor-0.8.5.el7.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel
@@ -35,6 +35,7 @@ Summary: Vitastor - OSD
Requires: libJerasure2
Requires: libisa-l
Requires: liburing >= 0.6
Requires: liburing < 2
Requires: vitastor-client = %{version}-%{release}
Requires: util-linux
Requires: parted
@@ -59,6 +60,7 @@ scheduling cluster-level operations.
%package -n vitastor-client
Summary: Vitastor - client
Requires: liburing >= 0.6
Requires: liburing < 2
%description -n vitastor-client

View File

@@ -35,7 +35,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.8.4.el8.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.8.5.el8.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.8.4
Version: 0.8.5
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.8.4.el8.tar.gz
Source0: vitastor-0.8.5.el8.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel
@@ -34,6 +34,7 @@ Summary: Vitastor - OSD
Requires: libJerasure2
Requires: libisa-l
Requires: liburing >= 0.6
Requires: liburing < 2
Requires: vitastor-client = %{version}-%{release}
Requires: util-linux
Requires: parted
@@ -57,6 +58,7 @@ scheduling cluster-level operations.
%package -n vitastor-client
Summary: Vitastor - client
Requires: liburing >= 0.6
Requires: liburing < 2
%description -n vitastor-client

View File

@@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8)
project(vitastor)
include(GNUInstallDirs)
include(CTest)
set(WITH_QEMU false CACHE BOOL "Build QEMU driver inside Vitastor source tree")
set(WITH_FIO true CACHE BOOL "Build FIO driver")
@@ -15,7 +16,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}")
endif()
add_definitions(-DVERSION="0.8.4")
add_definitions(-DVERSION="0.8.5")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -I ${CMAKE_SOURCE_DIR}/src)
if (${WITH_ASAN})
add_definitions(-fsanitize=address -fno-omit-frame-pointer)
@@ -55,6 +56,14 @@ if (ISAL_LIBRARIES)
add_definitions(-DWITH_ISAL)
endif (ISAL_LIBRARIES)
add_custom_target(build_tests)
add_custom_target(test
COMMAND
echo leak:tcmalloc > ${CMAKE_CURRENT_BINARY_DIR}/lsan-suppress.txt &&
env LSAN_OPTIONS=suppressions=${CMAKE_CURRENT_BINARY_DIR}/lsan-suppress.txt ${CMAKE_CTEST_COMMAND}
)
add_dependencies(test build_tests)
include_directories(
../
/usr/include/jerasure
@@ -89,7 +98,7 @@ endif (${WITH_FIO})
# libvitastor_common.a
set(MSGR_RDMA "")
if (IBVERBS_LIBRARIES)
set(MSGR_RDMA "msgr_rdma.cpp")
set(MSGR_RDMA msgr_rdma.cpp freelist.cpp allocator.cpp)
endif (IBVERBS_LIBRARIES)
add_library(vitastor_common STATIC
epoll_manager.cpp etcd_state_client.cpp messenger.cpp addr_util.cpp
@@ -145,7 +154,6 @@ add_library(vitastor_client SHARED
set_target_properties(vitastor_client PROPERTIES PUBLIC_HEADER "vitastor_c.h")
target_link_libraries(vitastor_client
vitastor_common
tcmalloc_minimal
${LIBURING_LIBRARIES}
${IBVERBS_LIBRARIES}
)
@@ -235,14 +243,17 @@ add_executable(osd_test osd_test.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(osd_test tcmalloc_minimal)
# osd_rmw_test
# FIXME: Move to tests
add_executable(osd_rmw_test osd_rmw_test.cpp allocator.cpp)
add_executable(osd_rmw_test EXCLUDE_FROM_ALL osd_rmw_test.cpp allocator.cpp)
target_link_libraries(osd_rmw_test Jerasure ${ISAL_LIBRARIES} tcmalloc_minimal)
add_dependencies(build_tests osd_rmw_test)
add_test(NAME osd_rmw_test COMMAND osd_rmw_test)
if (ISAL_LIBRARIES)
add_executable(osd_rmw_test_je osd_rmw_test.cpp allocator.cpp)
add_executable(osd_rmw_test_je EXCLUDE_FROM_ALL osd_rmw_test.cpp allocator.cpp)
target_compile_definitions(osd_rmw_test_je PUBLIC -DNO_ISAL)
target_link_libraries(osd_rmw_test_je Jerasure tcmalloc_minimal)
add_dependencies(build_tests osd_rmw_test_je)
add_test(NAME osd_rmw_test_jerasure COMMAND osd_rmw_test_je)
endif (ISAL_LIBRARIES)
# stub_uring_osd
@@ -257,11 +268,20 @@ target_link_libraries(stub_uring_osd
)
# osd_peering_pg_test
add_executable(osd_peering_pg_test osd_peering_pg_test.cpp osd_peering_pg.cpp)
add_executable(osd_peering_pg_test EXCLUDE_FROM_ALL osd_peering_pg_test.cpp osd_peering_pg.cpp)
target_link_libraries(osd_peering_pg_test tcmalloc_minimal)
add_dependencies(build_tests osd_peering_pg_test)
add_test(NAME osd_peering_pg_test COMMAND osd_peering_pg_test)
# test_allocator
add_executable(test_allocator test_allocator.cpp allocator.cpp)
add_executable(test_allocator EXCLUDE_FROM_ALL test_allocator.cpp allocator.cpp)
add_dependencies(build_tests test_allocator)
add_test(NAME test_allocator COMMAND test_allocator)
# test_freelist
add_executable(test_freelist EXCLUDE_FROM_ALL test_freelist.cpp)
add_dependencies(build_tests test_freelist)
add_test(NAME test_freelist COMMAND test_freelist)
# test_cas
add_executable(test_cas
@@ -281,12 +301,15 @@ target_link_libraries(test_crc32
# test_cluster_client
add_executable(test_cluster_client
EXCLUDE_FROM_ALL
test_cluster_client.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp
)
target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__)
target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mock)
add_dependencies(build_tests test_cluster_client)
add_test(NAME test_cluster_client COMMAND test_cluster_client)
## test_blockstore, test_shit
#add_executable(test_blockstore test_blockstore.cpp)

View File

@@ -325,7 +325,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
{
// Basic verification not passed
op->retval = -EINVAL;
std::function<void (blockstore_op_t*)>(op->callback)(op);
ringloop->set_immediate([op]() { std::function<void (blockstore_op_t*)>(op->callback)(op); });
return;
}
if (op->opcode == BS_OP_SYNC_STAB_ALL)
@@ -368,7 +368,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
}
if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op))
{
std::function<void (blockstore_op_t*)>(op->callback)(op);
ringloop->set_immediate([op]() { std::function<void (blockstore_op_t*)>(op->callback)(op); });
return;
}
// Call constructor without allocating memory. We'll call destructor before returning op back

View File

@@ -121,8 +121,7 @@ resume_1:
}
if (pool_cfg.scheme != POOL_SCHEME_REPLICATED)
{
uint64_t pg_real_size = pool_stats[pool_cfg.id]["pg_real_size"].uint64_value();
pool_avail = pg_real_size > 0 ? pool_avail * (pool_cfg.pg_size - pool_cfg.parity_chunks) / pg_real_size : 0;
pool_avail *= (pool_cfg.pg_size - pool_cfg.parity_chunks);
}
pool_stats[pool_cfg.id] = json11::Json::object {
{ "name", pool_cfg.name },

View File

@@ -92,6 +92,7 @@ struct rm_inode_t
void send_ops(rm_pg_t *cur_list)
{
parent->cli->init_msgr();
if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) ==
parent->cli->msgr.osd_peer_fds.end())
{

View File

@@ -59,7 +59,6 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
delete op;
};
msgr.parse_config(this->config);
msgr.init();
st_cli.tfd = tfd;
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
@@ -73,17 +72,6 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
scrap_buffer_size = SCRAP_BUFFER_SIZE;
scrap_buffer = malloc_or_die(scrap_buffer_size);
if (ringloop)
{
consumer.loop = [this]()
{
msgr.read_requests();
msgr.send_replies();
this->ringloop->submit();
};
ringloop->register_consumer(&consumer);
}
}
cluster_client_t::~cluster_client_t()
@@ -115,6 +103,24 @@ cluster_op_t::~cluster_op_t()
}
}
void cluster_client_t::init_msgr()
{
if (msgr_initialized)
return;
msgr.init();
msgr_initialized = true;
if (ringloop)
{
consumer.loop = [this]()
{
msgr.read_requests();
msgr.send_replies();
this->ringloop->submit();
};
ringloop->register_consumer(&consumer);
}
}
void cluster_client_t::calc_wait(cluster_op_t *op)
{
op->prev_wait = 0;
@@ -223,11 +229,14 @@ void cluster_client_t::erase_op(cluster_op_t *op)
if (op_queue_tail == op)
op_queue_tail = op->prev;
op->next = op->prev = NULL;
if (flags & OP_FLUSH_BUFFER)
std::function<void(cluster_op_t*)>(op->callback)(op);
if (!(flags & OP_IMMEDIATE_COMMIT))
inc_wait(opcode, flags, next, -1);
// Call callback at the end to avoid inconsistencies in prev_wait
// if the callback adds more operations itself
std::function<void(cluster_op_t*)>(op->callback)(op);
if (!(flags & OP_FLUSH_BUFFER))
std::function<void(cluster_op_t*)>(op->callback)(op);
}
void cluster_client_t::continue_ops(bool up_retry)
@@ -757,7 +766,10 @@ resume_3:
{
for (int i = 0; i < op->parts.size(); i++)
{
op->parts[i].flags = PART_RETRY;
if (!(op->parts[i].flags & PART_DONE))
{
op->parts[i].flags = PART_RETRY;
}
}
goto resume_2;
}
@@ -915,6 +927,10 @@ bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len
bool cluster_client_t::try_send(cluster_op_t *op, int i)
{
if (!msgr_initialized)
{
init_msgr();
}
auto part = &op->parts[i];
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->cur_inode));
auto pg_it = pool_cfg.pg_config.find(part->pg_num);

View File

@@ -104,10 +104,14 @@ class cluster_client_t
std::vector<std::function<void(void)>> on_ready_hooks;
std::vector<inode_list_t*> lists;
int continuing_ops = 0;
bool msgr_initialized = false;
public:
etcd_state_client_t st_cli;
osd_messenger_t msgr;
void init_msgr();
json11::Json config;
json11::Json::object merged_config;

View File

@@ -305,10 +305,10 @@ int write_zero(int fd, uint64_t offset, uint64_t size)
json11::Json read_parttable(std::string dev)
{
std::string part_dump;
int r = shell_exec({ "sfdisk", "--dump", dev, "--json" }, "", &part_dump, NULL);
int r = shell_exec({ "sfdisk", "--json", dev }, "", &part_dump, NULL);
if (r == 255)
{
fprintf(stderr, "Error running sfdisk --dump %s --json\n", dev.c_str());
fprintf(stderr, "Error running sfdisk --json %s\n", dev.c_str());
return json11::Json(false);
}
// Decode partition table
@@ -319,7 +319,7 @@ json11::Json read_parttable(std::string dev)
pt = json11::Json::parse(part_dump, err);
if (err != "")
{
fprintf(stderr, "sfdisk --dump %s --json returned bad JSON: %s\n", dev.c_str(), part_dump.c_str());
fprintf(stderr, "sfdisk --json %s returned bad JSON: %s\n", dev.c_str(), part_dump.c_str());
return json11::Json(false);
}
pt = pt["partitiontable"];

63
src/freelist.cpp Normal file
View File

@@ -0,0 +1,63 @@
// Copyright (c) Vitaliy Filippov, 2023+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <assert.h>
#include "freelist.h"
uint64_t freelist_allocator_t::alloc(uint64_t data_size)
{
for (int i = 0; i < freelist.size(); i++)
{
if (freelist[i].size >= data_size)
{
uint64_t r = freelist[i].start;
freelist[i].start += data_size;
freelist[i].size -= data_size;
return r;
}
}
return UINT64_MAX;
}
void freelist_allocator_t::free(uint64_t start, uint64_t size)
{
int min = 0, max = freelist.size();
if (max && freelist[freelist.size()-1].start < start)
{
min = max;
}
if (max && freelist[0].start >= start)
{
max = 0;
}
while (max-min > 1)
{
int mid = (min+max)/2;
if (freelist[mid].start >= start)
max = mid;
else
min = mid;
}
// max = the first item where freelist[max].start >= start
if (max > 0 && freelist[max-1].start+freelist[max-1].size >= start)
{
assert(freelist[max-1].start+freelist[max-1].size == start);
freelist[max-1].size += size;
}
else if (max < freelist.size() && freelist[max].start <= size+start)
{
assert(freelist[max].start == size+start);
freelist[max].start -= size;
freelist[max].size += size;
}
else
{
freelist.insert(freelist.begin()+min, (freelist_item_t){ .start = start, .size = size });
max = min; // to skip the if below
}
if (min != max && max < freelist.size() && freelist[max].start == freelist[min].start+freelist[min].size)
{
freelist[min].size += freelist[max].size;
freelist.erase(freelist.begin()+max, freelist.begin()+max+1);
}
}

23
src/freelist.h Normal file
View File

@@ -0,0 +1,23 @@
// Copyright (c) Vitaliy Filippov, 2023+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#include <stdint.h>
#include <vector>
struct freelist_item_t
{
uint64_t start, size;
};
// Really trivial freelist allocator
// Should be fine for remote RDMA memory management because
// most of the time fragmentation shouldn't be an issue as all
// memory regions are short-lived
struct freelist_allocator_t
{
std::vector<freelist_item_t> freelist;
uint64_t alloc(uint64_t data_size);
void free(uint64_t start, uint64_t size);
};

View File

@@ -157,13 +157,16 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->rdma_max_sge = 128;
this->rdma_max_send = config["rdma_max_send"].uint64_value();
if (!this->rdma_max_send)
this->rdma_max_send = 1;
this->rdma_max_send = 128;
this->rdma_max_recv = config["rdma_max_recv"].uint64_value();
if (!this->rdma_max_recv)
this->rdma_max_recv = 128;
this->rdma_max_msg = config["rdma_max_msg"].uint64_value();
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
this->rdma_max_msg = 129*1024;
this->rdma_op_slots = config["rdma_op_slots"].uint64_value();
if (!this->rdma_op_slots || this->rdma_op_slots >= 1024*1024)
this->rdma_op_slots = 4096;
this->rdma_op_memory = config["rdma_op_memory"].uint64_value();
if (!this->rdma_op_memory || this->rdma_op_memory >= 1024*1024*1024)
this->rdma_op_memory = 16*1024*1024;
#endif
this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value();
if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024)
@@ -388,12 +391,16 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
#ifdef WITH_RDMA
if (rdma_context)
{
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_op_slots, rdma_op_memory);
if (cl->rdma_conn)
{
clients_by_qp[cl->rdma_conn->qp->qp_num] = cl->peer_fd;
json11::Json payload = json11::Json::object {
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
{ "rdma_max_msg", cl->rdma_conn->max_msg },
{ "rdma_data_rkey", (uint64_t)cl->rdma_conn->in_data_mr->rkey },
{ "rdma_op_rkey", (uint64_t)cl->rdma_conn->in_op_mr->rkey },
{ "rdma_op_slots", cl->rdma_conn->op_slots },
{ "rdma_op_memory", cl->rdma_conn->op_memory },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
@@ -453,12 +460,14 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
{
msgr_rdma_address_t addr;
if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) ||
config["rdma_op_memory"].uint64_value() == 0 ||
cl->rdma_conn->connect(&addr) != 0)
{
fprintf(
stderr, "Failed to connect to OSD %lu (address %s) using RDMA\n",
cl->osd_num, config["rdma_address"].string_value().c_str()
);
clients_by_qp.erase(cl->rdma_conn->qp->qp_num);
delete cl->rdma_conn;
cl->rdma_conn = NULL;
// FIXME: Keep TCP connection in this case
@@ -470,11 +479,12 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
}
else
{
uint64_t server_max_msg = config["rdma_max_msg"].uint64_value();
if (cl->rdma_conn->max_msg > server_max_msg)
{
cl->rdma_conn->max_msg = server_max_msg;
}
cl->rdma_conn->set_out_capacity(
config["rdma_data_rkey"].uint64_value(),
config["rdma_op_rkey"].uint64_value(),
config["rdma_op_slots"].uint64_value(),
config["rdma_op_memory"].uint64_value()
);
if (log_level > 0)
{
fprintf(stderr, "Connected to OSD %lu using RDMA\n", cl->osd_num);

View File

@@ -37,6 +37,7 @@
#define MSGR_SENDP_HDR 1
#define MSGR_SENDP_FREE 2
#define MSGR_SENDP_LAST 4
struct msgr_sendp_t
{
@@ -131,13 +132,15 @@ protected:
bool use_rdma = true;
std::string rdma_device;
uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0;
msgr_rdma_context_t *rdma_context = NULL;
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
uint64_t rdma_max_msg = 0;
uint64_t rdma_op_slots = 0, rdma_op_memory = 0;
msgr_rdma_context_t *rdma_context = NULL;
std::map<uint32_t, int> clients_by_qp;
#endif
std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients;
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
std::vector<std::function<void()>> set_immediate;
public:
@@ -169,7 +172,8 @@ public:
#ifdef WITH_RDMA
bool is_rdma_enabled();
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
bool connect_rdma(int peer_fd, std::string rdma_address,
uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory);
#endif
protected:
@@ -190,12 +194,13 @@ protected:
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
bool handle_finished_read(osd_client_t *cl);
void handle_op_hdr(osd_client_t *cl);
bool handle_reply_hdr(osd_client_t *cl);
bool handle_reply_hdr(void *reply_hdr, osd_client_t *cl);
void handle_reply_ready(osd_op_t *op);
#ifdef WITH_RDMA
bool try_send_rdma(osd_client_t *cl);
bool try_recv_rdma(osd_client_t *cl);
void handle_rdma_events();
bool rdma_handle_op(osd_client_t *cl, uint32_t op_slot);
#endif
};

View File

@@ -46,9 +46,20 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
ctx->used_max_cqe -= max_send+max_recv;
if (qp)
ibv_destroy_qp(qp);
if (recv_buffers.size())
for (auto b: recv_buffers)
free(b);
if (in_data_mr)
ibv_dereg_mr(in_data_mr);
if (in_op_mr)
ibv_dereg_mr(in_op_mr);
if (in_data_buf)
free(in_data_buf);
if (in_ops)
free(in_ops);
if (out_op_alloc)
delete out_op_alloc;
if (out_slot_data)
free(out_slot_data);
if (out_slot_ops)
free(out_slot_ops);
}
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
@@ -149,7 +160,7 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr)
{
fprintf(stderr, "Couldn't register RDMA memory region\n");
fprintf(stderr, "Couldn't register global RDMA memory region: %s\n", strerror(errno));
goto cleanup;
}
@@ -180,7 +191,7 @@ cleanup:
}
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send,
uint32_t max_recv, uint32_t max_sge, uint32_t max_msg)
uint32_t max_recv, uint32_t max_sge, uint64_t op_slots, uint64_t op_memory)
{
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
@@ -190,7 +201,6 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
conn->max_send = max_send;
conn->max_recv = max_recv;
conn->max_sge = max_sge;
conn->max_msg = max_msg;
ctx->used_max_cqe += max_send+max_recv;
if (ctx->used_max_cqe > ctx->max_cqe)
@@ -211,6 +221,30 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
ctx->max_cqe = new_max_cqe;
}
conn->op_memory = op_memory;
conn->in_data_buf = memalign_or_die(MEM_ALIGNMENT, op_memory);
conn->in_data_mr = ibv_reg_mr(ctx->pd, conn->in_data_buf, op_memory,
IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND);
if (!conn->in_data_mr)
{
fprintf(stderr, "Couldn't register %lu MB RDMA memory region for incoming data: %s\n",
(op_memory+1024*1024-1)/1024/1024, strerror(errno));
delete conn;
return NULL;
}
conn->op_slots = op_slots;
conn->in_ops = (msgr_rdma_cmd_t *)malloc_or_die(sizeof(msgr_rdma_cmd_t) * op_slots);
conn->in_op_mr = ibv_reg_mr(ctx->pd, conn->in_ops, sizeof(msgr_rdma_cmd_t) * op_slots,
IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND);
if (!conn->in_op_mr)
{
fprintf(stderr, "Couldn't register %lu KB RDMA memory region for incoming operation headers: %s\n",
(sizeof(msgr_rdma_cmd_t) * op_slots + 1023)/1024, strerror(errno));
delete conn;
return NULL;
}
ibv_qp_init_attr init_attr = {
.send_cq = ctx->cq,
.recv_cq = ctx->cq,
@@ -237,7 +271,7 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
ibv_qp_attr attr = {
.qp_state = IBV_QPS_INIT,
.qp_access_flags = 0,
.qp_access_flags = IBV_ACCESS_REMOTE_WRITE,
.pkey_index = 0,
.port_num = ctx->ib_port,
};
@@ -265,6 +299,19 @@ static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu)
return IBV_MTU_4096;
}
void msgr_rdma_connection_t::set_out_capacity(uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory)
{
assert(!out_op_alloc);
this->out_data_rkey = out_data_rkey;
this->out_op_rkey = out_op_rkey;
this->out_op_slots = out_op_slots;
this->out_op_memory = out_op_memory;
out_op_alloc = new allocator(out_op_slots);
out_data_alloc.free(0, out_op_memory);
out_slot_data = (msgr_rdma_out_pos_t *)malloc_or_die(sizeof(msgr_rdma_out_pos_t) * out_op_slots);
out_slot_ops = (osd_op_t **)malloc_or_die(sizeof(osd_op_t *) * out_op_slots);
}
int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest)
{
auto conn = this;
@@ -311,17 +358,14 @@ int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest)
return 0;
}
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg)
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address,
uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory)
{
// Try to connect to the peer using RDMA
msgr_rdma_address_t addr;
if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr))
{
if (client_max_msg > rdma_max_msg)
{
client_max_msg = rdma_max_msg;
}
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_op_slots, rdma_op_memory);
if (rdma_conn)
{
int r = rdma_conn->connect(&addr);
@@ -336,6 +380,8 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64
else
{
// Remember connection, but switch to RDMA only after sending the configuration response
clients_by_qp[rdma_conn->qp->qp_num] = peer_fd;
rdma_conn->set_out_capacity(out_data_rkey, out_op_rkey, out_op_slots, out_op_memory);
auto cl = clients.at(peer_fd);
cl->rdma_conn = rdma_conn;
cl->peer_state = PEER_RDMA_CONNECTING;
@@ -346,66 +392,161 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64
return false;
}
static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
{
ibv_send_wr *bad_wr = NULL;
ibv_send_wr wr = {
.wr_id = (uint64_t)(cl->peer_fd*2+1),
.sg_list = sge,
.num_sge = op_sge,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
};
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
if (err || bad_wr)
{
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
exit(1);
}
cl->rdma_conn->cur_send++;
}
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
{
auto rc = cl->rdma_conn;
if (!cl->send_list.size() || rc->cur_send > 0)
if (!cl->send_list.size() && !rc->in_slots_freed.size() || rc->cur_send >= rc->max_send)
{
// Only send one batch at a time
return true;
}
uint64_t op_size = 0, op_sge = 0;
ibv_sge sge[rc->max_sge];
while (rc->send_pos < cl->send_list.size())
int i = 0;
while (i < rc->in_slots_freed.size())
{
iovec & iov = cl->send_list[rc->send_pos];
if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
{
try_send_rdma_wr(cl, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_send >= rc->max_send)
{
break;
}
}
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size);
sge[op_sge++] = {
.addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos),
.length = len,
.lkey = rc->ctx->mr->lkey,
auto op_slot = rc->in_slots_freed[i++];
assert(op_slot < 0x80000000);
ibv_send_wr *bad_wr = NULL;
ibv_send_wr wr = {
.wr_id = 0,
.opcode = IBV_WR_RDMA_WRITE_WITH_IMM,
.imm_data = 0x80000000 | op_slot,
};
op_size += len;
rc->send_buf_pos += len;
if (rc->send_buf_pos >= iov.iov_len)
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
if (err || bad_wr)
{
rc->send_pos++;
rc->send_buf_pos = 0;
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
exit(1);
}
rc->cur_send++;
if (rc->cur_send >= rc->max_send)
{
break;
}
}
if (op_sge > 0)
rc->in_slots_freed.erase(rc->in_slots_freed.begin(), rc->in_slots_freed.begin()+i);
if (!cl->send_list.size() || rc->cur_send >= rc->max_send)
{
try_send_rdma_wr(cl, sge, op_sge);
return true;
}
ibv_sge sge[rc->max_sge];
int op_start = 0;
while (op_start < cl->send_list.size())
{
uint64_t op_data_size = 0;
int op_end = op_start;
while (!(cl->outbox[op_end].flags & MSGR_SENDP_LAST))
{
op_data_size += cl->send_list[op_end].iov_len;
op_end++;
}
op_data_size += cl->send_list[op_end].iov_len;
op_end++;
op_data_size -= cl->send_list[op_start].iov_len;
// Operation boundaries in send_list: op_start..op_end, first iovec is the header
uint64_t op_slot = rc->out_op_alloc->find_free();
if (op_slot == UINT64_MAX)
{
// op queue is full
return true;
}
uint64_t data_pos = UINT64_MAX;
if (op_data_size >= 0)
{
if (rc->cur_send > rc->max_send-1-(op_end-op_start-1+rc->max_sge)/rc->max_sge)
{
// RDMA queue is full
return true;
}
// FIXME: Oops, and what if op data is larger than the whole buffer... :)
data_pos = rc->out_data_alloc.alloc(op_data_size);
if (data_pos == UINT64_MAX)
{
// data buffers are full
return true;
}
int cur_sge = 0;
for (int data_sent = 1; data_sent < op_end; data_sent++)
{
sge[cur_sge++] = {
.addr = (uintptr_t)cl->send_list[data_sent].iov_base,
.length = (uint32_t)cl->send_list[data_sent].iov_len,
.lkey = rc->ctx->mr->lkey,
};
if (data_sent == op_end-1 || cur_sge >= rc->max_sge)
{
ibv_send_wr *bad_wr = NULL;
ibv_send_wr wr = {
.wr_id = op_slot,
.next = NULL,
.sg_list = sge,
.num_sge = cur_sge,
.opcode = IBV_WR_RDMA_WRITE,
.send_flags = 0,
.wr = {
.rdma = {
.remote_addr = data_pos,
.rkey = rc->out_data_rkey,
},
},
};
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
if (err || bad_wr)
{
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
exit(1);
}
rc->cur_send++;
cur_sge = 0;
}
}
}
if (rc->cur_send > rc->max_send-1)
{
// RDMA queue is full
return true;
}
rc->out_op_alloc->set(op_slot, true);
assert(cl->send_list[op_start].iov_len == OSD_PACKET_SIZE);
sge[0] = {
.addr = (uintptr_t)cl->send_list[op_start].iov_base,
.length = (uint32_t)cl->send_list[op_start].iov_len,
.lkey = rc->ctx->mr->lkey,
};
rc->out_slot_data[op_slot] = { .data_pos = data_pos, .data_size = op_data_size };
rc->out_slot_ops[op_slot] = (cl->outbox[op_end-1].flags & MSGR_SENDP_FREE)
? cl->outbox[op_end-1].op : NULL;
sge[1] = {
.addr = (uintptr_t)(rc->out_slot_data+op_slot),
.length = sizeof(rc->out_slot_data[op_slot]),
.lkey = rc->ctx->mr->lkey,
};
ibv_send_wr *bad_wr = NULL;
ibv_send_wr wr = {
.wr_id = op_slot,
.next = NULL,
.sg_list = sge,
.num_sge = 2,
.opcode = IBV_WR_RDMA_WRITE_WITH_IMM,
.send_flags = IBV_SEND_SIGNALED,
.imm_data = (uint32_t)op_slot,
.wr = {
.rdma = {
.remote_addr = op_slot*sizeof(msgr_rdma_cmd_t),
.rkey = rc->out_op_rkey,
},
},
};
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
if (err || bad_wr)
{
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
exit(1);
}
rc->cur_send++;
op_start = op_end;
}
if (op_start > 0)
{
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_start);
}
return true;
}
@@ -427,23 +568,87 @@ static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
cl->rdma_conn->cur_recv++;
}
static void copy_data_to_recv_list(uint8_t *data_buf, uint64_t data_size, osd_client_t *cl)
{
uint64_t pos = 0;
while (cl->recv_list.done < cl->recv_list.count)
{
uint64_t cur = cl->recv_list.buf[cl->recv_list.done].iov_len;
assert(cur <= data_size-pos);
memcpy(cl->recv_list.buf[cl->recv_list.done].iov_base, data_buf+pos, cur);
pos += cur;
}
cl->recv_list.reset();
}
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
{
auto rc = cl->rdma_conn;
while (rc->cur_recv < rc->max_recv)
{
void *buf = malloc_or_die(rc->max_msg);
rc->recv_buffers.push_back(buf);
ibv_sge sge = {
.addr = (uintptr_t)buf,
.length = (uint32_t)rc->max_msg,
.lkey = rc->ctx->mr->lkey,
};
try_recv_rdma_wr(cl, &sge, 1);
try_recv_rdma_wr(cl, NULL, 0);
}
return true;
}
bool osd_messenger_t::rdma_handle_op(osd_client_t *cl, uint32_t op_slot)
{
auto rc = cl->rdma_conn;
if (op_slot >= rc->in_op_cap)
{
// Invalid incoming index
fprintf(stderr, "Client %d invalid incoming RDMA op slot: %u, dropping connection\n", cl->peer_fd, op_slot);
stop_client(cl->peer_fd);
return false;
}
osd_op_header_t *hdr = (osd_op_header_t *)rc->in_ops[op_slot].header;
uint8_t *data_buf = (uint8_t*)rc->in_data_buf + rc->in_ops[op_slot].pos.data_pos;
uint64_t data_size = rc->in_ops[op_slot].pos.data_size;
if (hdr->magic == SECONDARY_OSD_REPLY_MAGIC)
{
// Reply
if (cl->read_op)
{
delete cl->read_op;
cl->read_op = NULL;
}
if (!handle_reply_hdr(rc->in_ops[op_slot].header, cl))
return false;
if (cl->read_state == CL_READ_REPLY_DATA)
{
// copy reply data to cl->recv_list
copy_data_to_recv_list(data_buf, data_size, cl);
// and handle reply with data
handle_reply_ready(cl->read_op);
cl->read_op = NULL;
cl->read_state = 0;
cl->read_remaining = 0;
}
}
else
{
// Operation
cl->read_op = new osd_op_t;
cl->read_op->peer_fd = cl->peer_fd;
cl->read_op->op_type = OSD_OP_IN;
memcpy(&cl->read_op->req, hdr, OSD_PACKET_SIZE);
handle_op_hdr(cl);
if (cl->read_state == CL_READ_DATA)
{
copy_data_to_recv_list(data_buf, data_size, cl);
// And handle the incoming op with data
cl->received_ops.push_back(cl->read_op);
set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); });
cl->read_op = NULL;
cl->read_state = 0;
}
}
// We don't need the incoming data buffer anymore, notify peer about it
// FIXME: Allow to pass memory to the internal layer without copying and notify after handling it
rc->in_slots_freed.push_back(op_slot);
return true;
}
#define RDMA_EVENTS_AT_ONCE 32
void osd_messenger_t::handle_rdma_events()
@@ -468,9 +673,9 @@ void osd_messenger_t::handle_rdma_events()
event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc);
for (int i = 0; i < event_count; i++)
{
int client_id = wc[i].wr_id >> 1;
bool is_send = wc[i].wr_id & 1;
auto cl_it = clients.find(client_id);
auto cqp_it = clients_by_qp.find(wc[i].qp_num);
int peer_fd = cqp_it != clients_by_qp.end() ? cqp_it->second : -1;
auto cl_it = clients.find(peer_fd);
if (cl_it == clients.end())
{
continue;
@@ -478,55 +683,51 @@ void osd_messenger_t::handle_rdma_events()
osd_client_t *cl = cl_it->second;
if (wc[i].status != IBV_WC_SUCCESS)
{
fprintf(stderr, "RDMA work request failed for client %d", client_id);
fprintf(stderr, "RDMA work request failed for client %d", peer_fd);
if (cl->osd_num)
{
fprintf(stderr, " (OSD %lu)", cl->osd_num);
}
fprintf(stderr, " with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status));
stop_client(client_id);
if (peer_fd >= 0)
stop_client(peer_fd);
continue;
}
if (!is_send)
auto rc = cl->rdma_conn;
if (wc[i].opcode == IBV_WC_RDMA_WRITE)
{
cl->rdma_conn->cur_recv--;
if (!handle_read_buffer(cl, cl->rdma_conn->recv_buffers[0], wc[i].byte_len))
// Operation or reply is sent, we can free it
auto & op = rc->out_slot_ops[wc[i].wr_id];
if (op)
{
// handle_read_buffer may stop the client
continue;
delete op;
op = NULL;
}
free(cl->rdma_conn->recv_buffers[0]);
cl->rdma_conn->recv_buffers.erase(cl->rdma_conn->recv_buffers.begin(), cl->rdma_conn->recv_buffers.begin()+1);
try_recv_rdma(cl);
rc->cur_send--;
try_send_rdma(cl);
}
else
else if (wc[i].opcode == IBV_WC_RECV)
{
cl->rdma_conn->cur_send--;
if (!cl->rdma_conn->cur_send)
if (!(wc[i].imm_data & 0x80000000))
{
// Wait for the whole batch
for (int i = 0; i < cl->rdma_conn->send_pos; i++)
// Operation or reply received. Handle it
if (!rdma_handle_op(cl, wc[i].imm_data))
{
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
{
// Reply fully sent
delete cl->outbox[i].op;
}
// false means that the client is stopped due to invalid operation
continue;
}
if (cl->rdma_conn->send_pos > 0)
{
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos);
cl->rdma_conn->send_pos = 0;
}
if (cl->rdma_conn->send_buf_pos > 0)
{
cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + cl->rdma_conn->send_buf_pos;
cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos;
cl->rdma_conn->send_buf_pos = 0;
}
try_send_rdma(cl);
rc->cur_recv--;
try_recv_rdma(cl);
}
else
{
// Outbox slot is marked as free (the remote side doesn't need it anymore)
uint32_t op_slot = wc[i].imm_data & 0x7FFFFFFF;
auto & pos = rc->in_ops[op_slot].pos;
if (pos.data_size > 0)
rc->out_data_alloc.free(pos.data_pos, pos.data_size);
rc->out_op_alloc->set(op_slot, false);
}
// Try to continue sending
try_send_rdma(cl);
}
}
} while (event_count > 0);

View File

@@ -5,6 +5,11 @@
#include <infiniband/verbs.h>
#include <string>
#include <vector>
#include "allocator.h"
#include "freelist.h"
#include "osd_ops.h"
struct osd_op_t;
struct msgr_rdma_address_t
{
@@ -39,6 +44,17 @@ struct msgr_rdma_context_t
~msgr_rdma_context_t();
};
struct msgr_rdma_out_pos_t
{
uint64_t data_pos, data_size;
};
struct msgr_rdma_cmd_t
{
uint8_t header[OSD_PACKET_SIZE];
msgr_rdma_out_pos_t pos;
};
struct msgr_rdma_connection_t
{
msgr_rdma_context_t *ctx = NULL;
@@ -46,13 +62,24 @@ struct msgr_rdma_connection_t
msgr_rdma_address_t addr;
int max_send = 0, max_recv = 0, max_sge = 0;
int cur_send = 0, cur_recv = 0;
uint64_t max_msg = 0;
uint64_t op_slots = 0, op_memory = 0;
int send_pos = 0, send_buf_pos = 0;
int recv_pos = 0, recv_buf_pos = 0;
std::vector<void*> recv_buffers;
ibv_mr *in_data_mr = NULL, *in_op_mr = NULL;
msgr_rdma_cmd_t *in_ops = NULL;
int in_op_cap = 0;
void *in_data_buf = NULL;
std::vector<uint32_t> in_slots_freed;
uint32_t out_data_rkey = 0, out_op_rkey = 0;
uint64_t out_op_slots = 0, out_op_memory = 0;
allocator *out_op_alloc = NULL;
freelist_allocator_t out_data_alloc;
msgr_rdma_out_pos_t *out_slot_data = NULL;
osd_op_t **out_slot_ops = NULL;
~msgr_rdma_connection_t();
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send,
uint32_t max_recv, uint32_t max_sge, uint64_t op_slots, uint64_t op_memory);
int connect(msgr_rdma_address_t *dest);
void set_out_capacity(uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory);
};

View File

@@ -172,7 +172,7 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
if (cl->read_state == CL_READ_HDR)
{
if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
return handle_reply_hdr(cl);
return handle_reply_hdr(cl->read_op->req.buf, cl);
else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC)
handle_op_hdr(cl);
else
@@ -286,7 +286,7 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
}
}
bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
bool osd_messenger_t::handle_reply_hdr(void *reply_hdr, osd_client_t *cl)
{
auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id);
if (req_it == cl->sent_ops.end())
@@ -297,7 +297,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
return false;
}
osd_op_t *op = req_it->second;
memcpy(op->reply.buf, cl->read_op->req.buf, OSD_PACKET_SIZE);
memcpy(op->reply.buf, reply_hdr, OSD_PACKET_SIZE);
cl->sent_ops.erase(req_it);
if (op->reply.hdr.opcode == OSD_OP_SEC_READ || op->reply.hdr.opcode == OSD_OP_READ)
{
@@ -328,14 +328,16 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
{
goto reuse;
}
delete cl->read_op;
if (cl->read_op)
delete cl->read_op;
cl->read_op = op;
cl->read_state = CL_READ_REPLY_DATA;
}
else if (op->reply.hdr.opcode == OSD_OP_SEC_LIST && op->reply.hdr.retval > 0)
{
assert(!op->iov.count);
delete cl->read_op;
if (cl->read_op)
delete cl->read_op;
cl->read_op = op;
cl->read_state = CL_READ_REPLY_DATA;
cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval;
@@ -345,7 +347,8 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
else if (op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP && op->reply.hdr.retval > 0)
{
assert(!op->iov.count);
delete cl->read_op;
if (cl->read_op)
delete cl->read_op;
cl->read_op = op;
cl->read_state = CL_READ_REPLY_DATA;
cl->read_remaining = op->reply.hdr.retval;
@@ -355,7 +358,8 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
}
else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0)
{
delete cl->read_op;
if (cl->read_op)
delete cl->read_op;
cl->read_op = op;
cl->read_state = CL_READ_REPLY_DATA;
cl->read_remaining = op->reply.hdr.retval;
@@ -368,7 +372,8 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
reuse:
// It's fine to reuse cl->read_op for the next reply
handle_reply_ready(op);
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
if (cl->read_op)
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR;
}

View File

@@ -96,6 +96,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len });
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
}
to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_LAST;
if (cur_op->op_type == OSD_OP_IN)
{
to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_FREE;

View File

@@ -129,6 +129,7 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
#ifdef WITH_RDMA
if (cl->rdma_conn)
{
clients_by_qp.erase(cl->rdma_conn->qp->qp_num);
delete cl->rdma_conn;
}
#endif

View File

@@ -683,7 +683,7 @@ void osd_t::apply_pg_config()
auto vec_all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end());
if (currently_taken)
{
if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING | PG_REPEERING | PG_PEERED))
if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING | PG_REPEERING))
{
if (pg_it->second.target_set == pg_cfg.target_set &&
pg_it->second.target_history == pg_cfg.target_history &&
@@ -963,13 +963,6 @@ void osd_t::report_pg_states()
}
this->pgs.erase(pg_it);
}
else if (pg_it->second.state & PG_PEERED)
{
// Activate PG after PG PEERED state is reported along with history
// (if the state wasn't changed again)
pg_it->second.state = pg_it->second.state & ~PG_PEERED | PG_ACTIVE;
report_pg_state(pg_it->second);
}
}
}
// Push other PG state updates, if any

View File

@@ -50,10 +50,6 @@ void osd_t::handle_peers()
still = true;
}
}
else if (p.second.state & PG_PEERED)
{
still = true;
}
}
if (!still)
{
@@ -74,10 +70,6 @@ void osd_t::handle_peers()
}
still = true;
}
else if (p.second.state & PG_PEERED)
{
still = true;
}
}
if (!still)
{
@@ -100,7 +92,7 @@ void osd_t::repeer_pgs(osd_num_t peer_osd)
{
auto & pg = p.second;
bool repeer = false;
if (pg.state & (PG_PEERING | PG_PEERED | PG_ACTIVE | PG_INCOMPLETE))
if (pg.state & (PG_PEERING | PG_ACTIVE | PG_INCOMPLETE))
{
for (osd_num_t pg_osd: pg.all_peers)
{

View File

@@ -88,13 +88,9 @@ void pg_obj_state_check_t::walk()
{
// Activate as degraded
// Current OSD set will be added into target_history on first write
pg->state |= PG_DEGRADED | PG_PEERED;
}
else
{
// Just activate
pg->state |= PG_ACTIVE;
pg->state |= PG_DEGRADED;
}
pg->state |= PG_ACTIVE;
if (pg->state == PG_ACTIVE && pg->cur_peers.size() < pg->all_peers.size())
{
pg->state |= PG_LEFT_ON_DEAD;
@@ -460,11 +456,10 @@ void pg_t::calc_object_states(int log_level)
void pg_t::print_state()
{
printf(
"[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
"[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
(state & PG_STARTING) ? "starting" : "",
(state & PG_OFFLINE) ? "offline" : "",
(state & PG_PEERING) ? "peering" : "",
(state & PG_PEERED) ? "peered" : "",
(state & PG_INCOMPLETE) ? "incomplete" : "",
(state & PG_ACTIVE) ? "active" : "",
(state & PG_REPEERING) ? "repeering" : "",

View File

@@ -54,5 +54,6 @@ int main(int argc, char *argv[])
{
printf("dev: state=%lx\n", it.second.state);
}
delete pg.peering_state;
return 0;
}

View File

@@ -297,7 +297,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
// Fail it immediately
subop->peer_fd = -1;
subop->reply.hdr.retval = -EPIPE;
subop->callback(subop);
ringloop->set_immediate([subop]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
}
subop_idx++;
}

View File

@@ -235,7 +235,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
// Fail it immediately
subop->peer_fd = -1;
subop->reply.hdr.retval = -EPIPE;
subop->callback(subop);
ringloop->set_immediate([subop]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
}
}
i++;
@@ -520,7 +520,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
// Fail it immediately
subops[i].peer_fd = -1;
subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]);
ringloop->set_immediate([subop = &subops[i]]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
}
}
}
@@ -635,7 +635,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
// Fail it immediately
subops[i].peer_fd = -1;
subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]);
ringloop->set_immediate([subop = &subops[i]]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
}
}
}

View File

@@ -168,8 +168,8 @@ resume_3:
auto it = std::lower_bound(pg.target_history.begin(), pg.target_history.end(), history_set);
if (it == pg.target_history.end() || *it != history_set)
pg.target_history.insert(it, history_set);
pg.history_changed = true;
}
pg.history_changed = true;
report_pg_states();
resume_10:
if (pg.epoch > pg.reported_epoch)

View File

@@ -881,7 +881,7 @@ void test15()
// Done
free(rmw_buf);
free(write_buf);
use_ec(3, 2, false);
use_ec(4, 2, false);
}
/***
@@ -984,5 +984,5 @@ void test16()
// Done
free(rmw_buf);
free(write_buf);
use_ec(3, 2, false);
use_ec(4, 2, false);
}

View File

@@ -166,15 +166,20 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
{
// Indicate that RDMA is enabled
wire_config["rdma_enabled"] = true;
if (req_json["connect_rdma"].is_string())
if (req_json["connect_rdma"].is_string() && req_json["rdma_op_memory"].uint64_value() != 0)
{
// Peer is trying to connect using RDMA, try to satisfy him
bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value(), req_json["rdma_max_msg"].uint64_value());
bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value(),
req_json["rdma_data_rkey"].uint64_value(), req_json["rdma_op_rkey"].uint64_value(),
req_json["rdma_op_slots"].uint64_value(), req_json["rdma_op_memory"].uint64_value());
if (ok)
{
auto rc = msgr.clients.at(cur_op->peer_fd)->rdma_conn;
wire_config["rdma_address"] = rc->addr.to_string();
wire_config["rdma_max_msg"] = rc->max_msg;
wire_config["rdma_data_rkey"] = (uint64_t)rc->in_data_mr->rkey;
wire_config["rdma_op_rkey"] = (uint64_t)rc->in_op_mr->rkey;
wire_config["rdma_op_slots"] = rc->op_slots;
wire_config["rdma_op_memory"] = rc->op_memory;
}
}
}

View File

@@ -3,12 +3,11 @@
#include "pg_states.h"
const int pg_state_bit_count = 16;
const int pg_state_bit_count = 14;
const int pg_state_bits[16] = {
const int pg_state_bits[14] = {
PG_STARTING,
PG_PEERING,
PG_PEERED,
PG_INCOMPLETE,
PG_ACTIVE,
PG_REPEERING,
@@ -23,10 +22,9 @@ const int pg_state_bits[16] = {
PG_LEFT_ON_DEAD,
};
const char *pg_state_names[16] = {
const char *pg_state_names[14] = {
"starting",
"peering",
"peered",
"incomplete",
"active",
"repeering",

View File

@@ -4,27 +4,25 @@
#pragma once
// Placement group states
// STARTING -> [acquire lock] -> PEERING -> PEERED
// PEERED -> [report history if required!] -> INCOMPLETE|ACTIVE
// STARTING -> [acquire lock] -> PEERING -> INCOMPLETE|ACTIVE
// ACTIVE -> REPEERING -> PEERING
// ACTIVE -> STOPPING -> OFFLINE -> [release lock]
// Exactly one of these:
#define PG_STARTING (1<<0)
#define PG_PEERING (1<<1)
#define PG_PEERED (1<<2)
#define PG_INCOMPLETE (1<<3)
#define PG_ACTIVE (1<<4)
#define PG_REPEERING (1<<5)
#define PG_STOPPING (1<<6)
#define PG_OFFLINE (1<<7)
#define PG_INCOMPLETE (1<<2)
#define PG_ACTIVE (1<<3)
#define PG_REPEERING (1<<4)
#define PG_STOPPING (1<<5)
#define PG_OFFLINE (1<<6)
// Plus any of these:
#define PG_DEGRADED (1<<8)
#define PG_HAS_INCOMPLETE (1<<9)
#define PG_HAS_DEGRADED (1<<10)
#define PG_HAS_MISPLACED (1<<11)
#define PG_HAS_UNCLEAN (1<<12)
#define PG_HAS_INVALID (1<<13)
#define PG_LEFT_ON_DEAD (1<<14)
#define PG_DEGRADED (1<<7)
#define PG_HAS_INCOMPLETE (1<<8)
#define PG_HAS_DEGRADED (1<<9)
#define PG_HAS_MISPLACED (1<<10)
#define PG_HAS_UNCLEAN (1<<11)
#define PG_HAS_INVALID (1<<12)
#define PG_LEFT_ON_DEAD (1<<13)
// Lower bits that represent object role (EC 0/1/2... or always 0 with replication)
// 12 bits is a safe default that doesn't depend on pg_stripe_size or pg_block_size

View File

@@ -25,7 +25,6 @@ ring_loop_t::ring_loop_t(int qd)
{
free_ring_data[i] = i;
}
wait_sqe_id = 1;
}
ring_loop_t::~ring_loop_t()
@@ -83,17 +82,19 @@ void ring_loop_t::loop()
}
io_uring_cqe_seen(&ring, cqe);
}
while (get_sqe_queue.size() > 0)
{
(get_sqe_queue[0].second)();
get_sqe_queue.erase(get_sqe_queue.begin());
}
do
{
loop_again = false;
for (int i = 0; i < consumers.size(); i++)
{
consumers[i]->loop();
if (immediate_queue.size())
{
immediate_queue2.swap(immediate_queue);
for (auto & cb: immediate_queue2)
cb();
immediate_queue2.clear();
}
}
} while (loop_again);
}

View File

@@ -119,11 +119,10 @@ struct ring_consumer_t
class ring_loop_t
{
std::vector<std::pair<int,std::function<void()>>> get_sqe_queue;
std::vector<std::function<void()>> immediate_queue, immediate_queue2;
std::vector<ring_consumer_t*> consumers;
struct ring_data_t *ring_datas;
int *free_ring_data;
int wait_sqe_id;
unsigned free_ring_data_ptr;
bool loop_again;
struct io_uring ring;
@@ -145,20 +144,9 @@ public:
}
return sqe;
}
inline int wait_sqe(std::function<void()> cb)
inline void set_immediate(const std::function<void()> cb)
{
get_sqe_queue.push_back({ wait_sqe_id, cb });
return wait_sqe_id++;
}
inline void cancel_wait_sqe(int wait_id)
{
for (int i = 0; i < get_sqe_queue.size(); i++)
{
if (get_sqe_queue[i].first == wait_id)
{
get_sqe_queue.erase(get_sqe_queue.begin()+i, get_sqe_queue.begin()+i+1);
}
}
immediate_queue.push_back(cb);
}
inline int submit()
{

View File

@@ -8,7 +8,6 @@
void configure_single_pg_pool(cluster_client_t *cli)
{
cli->st_cli.on_load_pgs_hook(true);
cli->st_cli.parse_state((etcd_kv_t){
.key = "/config/pools",
.value = json11::Json::object {
@@ -43,6 +42,7 @@ void configure_single_pg_pool(cluster_client_t *cli)
{ "state", json11::Json::array { "active" } },
},
});
cli->st_cli.on_load_pgs_hook(true);
std::map<std::string, etcd_kv_t> changes;
cli->st_cli.on_change_hook(changes);
}
@@ -188,7 +188,6 @@ void test1()
int *r1 = test_write(cli, 0, 4096, 0x55);
configure_single_pg_pool(cli);
pretend_connected(cli, 1);
cli->continue_ops(true);
can_complete(r1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
@@ -196,8 +195,6 @@ void test1()
pretend_disconnected(cli, 1);
int *r2 = test_sync(cli);
pretend_connected(cli, 1);
check_op_count(cli, 1, 0);
cli->continue_ops(true);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
check_op_count(cli, 1, 1);
@@ -303,8 +300,6 @@ void test1()
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), -EPIPE);
check_disconnected(cli, 1);
pretend_connected(cli, 1);
check_op_count(cli, 1, 0);
cli->continue_ops(true);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_op_count(cli, 1, 1);

64
src/test_freelist.cpp Normal file
View File

@@ -0,0 +1,64 @@
// Copyright (c) Vitaliy Filippov, 2023+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <stdio.h>
#include <stdlib.h>
#include <stdexcept>
#include "freelist.cpp"
inline bool operator == (const freelist_item_t & a, const freelist_item_t & b)
{
return a.start == b.start && a.size == b.size;
}
void dump(std::vector<freelist_item_t> & freelist)
{
printf("free: ");
for (auto & item: freelist)
{
printf("%lx+%lx ", item.start, item.size);
}
printf("\n");
}
void dump(freelist_allocator_t &alloc)
{
dump(alloc.freelist);
}
uint64_t test_alloc(freelist_allocator_t &alloc, uint64_t size)
{
uint64_t r = alloc.alloc(size);
printf("alloc %lx: %lx\n", size, r);
return r;
}
void assert_eq(freelist_allocator_t &alloc, std::vector<freelist_item_t> v)
{
if (alloc.freelist != v)
{
printf("expected ");
dump(v);
printf("got ");
dump(alloc);
throw std::runtime_error("test failed");
}
dump(alloc);
}
int main(int narg, char *args[])
{
freelist_allocator_t alloc;
alloc.free(0, 0x1000000);
assert_eq(alloc, { { 0, 0x1000000 } });
assert(test_alloc(alloc, 0x1000) == 0);
assert_eq(alloc, { { 0x1000, 0xfff000 } });
assert(test_alloc(alloc, 0x4000) == 0x1000);
alloc.free(0x1000000, 0x4000);
assert_eq(alloc, { { 0x5000, 0xfff000 } });
alloc.free(0, 0x1000);
assert_eq(alloc, { { 0, 0x1000 }, { 0x5000, 0xfff000 } });
alloc.free(0x1000, 0x4000);
assert_eq(alloc, { { 0, 0x1004000 } });
return 0;
}

View File

@@ -6,7 +6,7 @@ includedir=${prefix}/@CMAKE_INSTALL_INCLUDEDIR@
Name: Vitastor
Description: Vitastor client library
Version: 0.8.4
Version: 0.8.5
Libs: -L${libdir} -lvitastor_client
Cflags: -I${includedir}