Compare commits

..

3 Commits

Author SHA1 Message Date
eb9fc274e8 Debug prints 2021-04-29 02:08:28 +03:00
9681b62204 WIP multi-queue RDMA 2021-04-29 02:08:28 +03:00
8faf8f7b58 Inline bitmaps
Handy for zero-copy RDMA tests (removes 4-byte s/g entries)
2021-04-29 02:07:46 +03:00
41 changed files with 891 additions and 607 deletions

View File

@@ -2,6 +2,6 @@ cmake_minimum_required(VERSION 2.8)
project(vitastor)
set(VERSION "0.6.3")
set(VERSION "0.6.2")
add_subdirectory(src)

View File

@@ -49,7 +49,6 @@ Vitastor на данный момент находится в статусе п
- Именование инодов через хранение их метаданных в etcd
- Снапшоты и copy-on-write клоны
- Сглаживание производительности случайной записи в SSD+HDD конфигурациях
- Поддержка RDMA/RoCEv2 через libibverbs
## Планы развития
@@ -61,7 +60,7 @@ Vitastor на данный момент находится в статусе п
- Фоновая проверка целостности без контрольных сумм (сверка реплик)
- Контрольные суммы
- Поддержка SSD-кэширования (tiered storage)
- Поддержка NVDIMM
- Поддержка RDMA и NVDIMM
- Web-интерфейс
- Возможно, сжатие
- Возможно, поддержка кэширования данных через системный page cache

View File

@@ -43,7 +43,6 @@ breaking changes in the future. However, the following is implemented:
- Inode metadata storage in etcd
- Snapshots and copy-on-write image clones
- Write throttling to smooth random write workloads in SSD+HDD configurations
- RDMA/RoCEv2 support via libibverbs
## Roadmap
@@ -55,7 +54,7 @@ breaking changes in the future. However, the following is implemented:
- Scrubbing without checksums (verification of replicas)
- Checksums
- Tiered storage
- NVDIMM support
- RDMA and NVDIMM support
- Web GUI
- Compression (possibly)
- Read caching using system page cache (possibly)

14
debian/changelog vendored
View File

@@ -1,18 +1,8 @@
vitastor (0.6.3-1) unstable; urgency=medium
vitastor (0.6.2-1) unstable; urgency=medium
* RDMA support
* Bugfixes
-- Vitaliy Filippov <vitalif@yourcmc.ru> Sat, 01 May 2021 18:46:10 +0300
vitastor (0.6.0-1) unstable; urgency=medium
* Snapshots and Copy-on-Write clones
* Image metadata in etcd (name, size)
* Image I/O and space statistics in etcd
* Write throttling for smoothing random write workloads in SSD+HDD configurations
-- Vitaliy Filippov <vitalif@yourcmc.ru> Sun, 11 Apr 2021 00:49:18 +0300
-- Vitaliy Filippov <vitalif@yourcmc.ru> Tue, 02 Feb 2021 23:01:24 +0300
vitastor (0.5.1-1) unstable; urgency=medium

2
debian/control vendored
View File

@@ -2,7 +2,7 @@ Source: vitastor
Section: admin
Priority: optional
Maintainer: Vitaliy Filippov <vitalif@yourcmc.ru>
Build-Depends: debhelper, liburing-dev (>= 0.6), g++ (>= 8), libstdc++6 (>= 8), linux-libc-dev, libgoogle-perftools-dev, libjerasure-dev, libgf-complete-dev, libibverbs-dev
Build-Depends: debhelper, liburing-dev (>= 0.6), g++ (>= 8), libstdc++6 (>= 8), linux-libc-dev, libgoogle-perftools-dev, libjerasure-dev, libgf-complete-dev
Standards-Version: 4.5.0
Homepage: https://vitastor.io/
Rules-Requires-Root: no

View File

@@ -22,7 +22,7 @@ RUN apt-get -y build-dep qemu
RUN apt-get -y build-dep fio
RUN apt-get --download-only source qemu
RUN apt-get --download-only source fio
RUN apt-get update && apt-get -y install libjerasure-dev cmake libibverbs-dev
RUN apt-get -y install libjerasure-dev cmake
ADD . /root/vitastor
RUN set -e -x; \
@@ -40,10 +40,10 @@ RUN set -e -x; \
mkdir -p /root/packages/vitastor-$REL; \
rm -rf /root/packages/vitastor-$REL/*; \
cd /root/packages/vitastor-$REL; \
cp -r /root/vitastor vitastor-0.6.3; \
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.6.3/qemu; \
ln -s /root/fio-build/fio-*/ vitastor-0.6.3/fio; \
cd vitastor-0.6.3; \
cp -r /root/vitastor vitastor-0.6.2; \
ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.6.2/qemu; \
ln -s /root/fio-build/fio-*/ vitastor-0.6.2/fio; \
cd vitastor-0.6.2; \
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
QEMU=$(head -n1 qemu/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
sh copy-qemu-includes.sh; \
@@ -59,8 +59,8 @@ RUN set -e -x; \
echo "dep:fio=$FIO" > debian/substvars; \
echo "dep:qemu=$QEMU" >> debian/substvars; \
cd /root/packages/vitastor-$REL; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.3.orig.tar.xz vitastor-0.6.3; \
cd vitastor-0.6.3; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.2.orig.tar.xz vitastor-0.6.2; \
cd vitastor-0.6.2; \
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \

View File

@@ -41,12 +41,6 @@ const etcd_allow = new RegExp('^'+[
const etcd_tree = {
config: {
/* global: {
// WARNING: NOT ALL OF THESE ARE ACTUALLY CONFIGURABLE HERE
// THIS IS JUST A POOR'S MAN CONFIG DOCUMENTATION
// etcd connection
config_path: "/etc/vitastor/vitastor.conf",
etcd_address: "10.0.115.10:2379/v3",
etcd_prefix: "/vitastor",
// mon
etcd_mon_ttl: 30, // min: 10
etcd_mon_timeout: 1000, // ms. min: 0
@@ -56,17 +50,7 @@ const etcd_tree = {
osd_out_time: 600, // seconds. min: 0
placement_levels: { datacenter: 1, rack: 2, host: 3, osd: 4, ... },
// client and osd
tcp_header_buffer_size: 65536,
use_sync_send_recv: false,
use_rdma: true,
rdma_device: null, // for example, "rocep5s0f0"
rdma_port_num: 1,
rdma_gid_index: 0,
rdma_mtu: 4096,
rdma_max_sge: 128,
rdma_max_send: 32,
rdma_max_recv: 8,
rdma_max_msg: 1048576,
log_level: 0,
block_size: 131072,
disk_alignment: 4096,

View File

@@ -48,4 +48,4 @@ FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Ve
QEMU=`rpm -qi qemu qemu-kvm | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
perl -i -pe 's/(Requires:\s*qemu(?:-kvm)?)([^\n]+)?/$1 = '$QEMU'/' $VITASTOR/rpm/vitastor-el$EL.spec
tar --transform 's#^#vitastor-0.6.3/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.3$(rpm --eval '%dist').tar.gz *
tar --transform 's#^#vitastor-0.6.2/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.2$(rpm --eval '%dist').tar.gz *

View File

@@ -17,7 +17,6 @@ RUN rpm --nomd5 -i fio*.src.rpm
RUN rm -f /etc/yum.repos.d/CentOS-Media.repo
RUN cd ~/rpmbuild/SPECS && yum-builddep -y --enablerepo='*' --disablerepo=centos-sclo-rh --disablerepo=centos-sclo-rh-source --disablerepo=centos-sclo-sclo-testing qemu-kvm.spec
RUN cd ~/rpmbuild/SPECS && yum-builddep -y --enablerepo='*' --disablerepo=centos-sclo-rh --disablerepo=centos-sclo-rh-source --disablerepo=centos-sclo-sclo-testing fio.spec
RUN yum -y install rdma-core-devel
ADD https://vitastor.io/rpms/liburing-el7/liburing-0.7-2.el7.src.rpm /root
@@ -38,7 +37,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.6.3.el7.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.6.2.el7.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.6.3
Version: 0.6.2
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.6.3.el7.tar.gz
Source0: vitastor-0.6.2.el7.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel
@@ -14,7 +14,6 @@ BuildRequires: rh-nodejs12
BuildRequires: rh-nodejs12-npm
BuildRequires: jerasure-devel
BuildRequires: gf-complete-devel
BuildRequires: libibverbs-devel
BuildRequires: cmake
Requires: fio = 3.7-1.el7
Requires: qemu-kvm = 2.0.0-1.el7.6
@@ -62,8 +61,8 @@ cp -r mon %buildroot/usr/lib/vitastor/mon
%_libdir/libfio_vitastor.so
%_libdir/libfio_vitastor_blk.so
%_libdir/libfio_vitastor_sec.so
%_libdir/libvitastor_blk.so*
%_libdir/libvitastor_client.so*
%_libdir/libvitastor_blk.so
%_libdir/libvitastor_client.so
/usr/lib/vitastor

View File

@@ -15,7 +15,6 @@ RUN rpm --nomd5 -i qemu*.src.rpm
RUN rpm --nomd5 -i fio*.src.rpm
RUN cd ~/rpmbuild/SPECS && dnf builddep -y --enablerepo=powertools --spec qemu-kvm.spec
RUN cd ~/rpmbuild/SPECS && dnf builddep -y --enablerepo=powertools --spec fio.spec && dnf install -y cmake
RUN yum -y install libibverbs-devel
ADD https://vitastor.io/rpms/liburing-el7/liburing-0.7-2.el7.src.rpm /root
@@ -36,7 +35,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.6.3.el8.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.6.2.el8.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.6.3
Version: 0.6.2
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.6.3.el8.tar.gz
Source0: vitastor-0.6.2.el8.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel
@@ -13,7 +13,6 @@ BuildRequires: gcc-toolset-9-gcc-c++
BuildRequires: nodejs >= 10
BuildRequires: jerasure-devel
BuildRequires: gf-complete-devel
BuildRequires: libibverbs-devel
BuildRequires: cmake
Requires: fio = 3.7-3.el8
Requires: qemu-kvm = 4.2.0-29.el8.6
@@ -59,8 +58,8 @@ cp -r mon %buildroot/usr/lib/vitastor
%_libdir/libfio_vitastor.so
%_libdir/libfio_vitastor_blk.so
%_libdir/libfio_vitastor_sec.so
%_libdir/libvitastor_blk.so*
%_libdir/libvitastor_client.so*
%_libdir/libvitastor_blk.so
%_libdir/libvitastor_client.so
/usr/lib/vitastor

View File

@@ -13,7 +13,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}")
endif()
add_definitions(-DVERSION="0.6.3")
add_definitions(-DVERSION="0.6.2")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -I ${CMAKE_SOURCE_DIR}/src)
if (${WITH_ASAN})
add_definitions(-fsanitize=address -fno-omit-frame-pointer)
@@ -72,15 +72,14 @@ target_link_libraries(fio_vitastor_blk
)
# libvitastor_common.a
set(MSGR_RDMA "")
if (IBVERBS_LIBRARIES)
set(MSGR_RDMA "msgr_rdma.cpp")
endif (IBVERBS_LIBRARIES)
add_library(vitastor_common STATIC
epoll_manager.cpp etcd_state_client.cpp
messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp
http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ${MSGR_RDMA}
http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp
)
if (IBVERBS_LIBRARIES)
target_sources(vitastor_common PRIVATE msgr_rdma.cpp)
endif (IBVERBS_LIBRARIES)
target_compile_options(vitastor_common PUBLIC -fPIC)
# vitastor-osd

View File

@@ -43,6 +43,11 @@ int blockstore_t::read_bitmap(object_id oid, uint64_t target_version, void *bitm
return impl->read_bitmap(oid, target_version, bitmap, result_version);
}
std::unordered_map<object_id, uint64_t> & blockstore_t::get_unstable_writes()
{
return impl->unstable_writes;
}
std::map<uint64_t, uint64_t> & blockstore_t::get_inode_space_stats()
{
return impl->inode_space_stats;

View File

@@ -183,6 +183,9 @@ public:
// Simplified synchronous operation: get object bitmap & current version
int read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version = NULL);
// Unstable writes are added here (map of object_id -> version)
std::unordered_map<object_id, uint64_t> & get_unstable_writes();
// Get per-inode space usage statistics
std::map<uint64_t, uint64_t> & get_inode_space_stats();

View File

@@ -16,8 +16,6 @@
cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
{
config = osd_messenger_t::read_config(config);
this->ringloop = ringloop;
this->tfd = tfd;
this->config = config;
@@ -954,14 +952,13 @@ int cluster_client_t::continue_sync(cluster_op_t *op)
return 1;
}
// Check that all OSD connections are still alive
for (auto do_it = dirty_osds.begin(); do_it != dirty_osds.end(); )
for (auto sync_osd: dirty_osds)
{
osd_num_t sync_osd = *do_it;
auto peer_it = msgr.osd_peer_fds.find(sync_osd);
if (peer_it == msgr.osd_peer_fds.end())
dirty_osds.erase(do_it++);
else
do_it++;
{
return 0;
}
}
// Post sync to affected OSDs
for (auto & prev_op: dirty_buffers)

View File

@@ -50,11 +50,6 @@ void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function<
void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback)
{
if (!etcd_addresses.size())
{
fprintf(stderr, "etcd_address is missing in Vitastor configuration\n");
exit(1);
}
std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()];
std::string etcd_api_path;
int pos = etcd_address.find('/');
@@ -90,7 +85,7 @@ void etcd_state_client_t::add_etcd_url(std::string addr)
}
}
void etcd_state_client_t::parse_config(const json11::Json & config)
void etcd_state_client_t::parse_config(json11::Json & config)
{
this->etcd_addresses.clear();
if (config["etcd_address"].is_string())
@@ -127,11 +122,6 @@ void etcd_state_client_t::parse_config(const json11::Json & config)
void etcd_state_client_t::start_etcd_watcher()
{
if (!etcd_addresses.size())
{
fprintf(stderr, "etcd_address is missing in Vitastor configuration\n");
exit(1);
}
std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()];
std::string etcd_api_path;
int pos = etcd_address.find('/');
@@ -352,7 +342,7 @@ void etcd_state_client_t::load_pgs()
});
}
#else
void etcd_state_client_t::parse_config(const json11::Json & config)
void etcd_state_client_t::parse_config(json11::Json & config)
{
}

View File

@@ -106,7 +106,7 @@ public:
void load_global_config();
void load_pgs();
void parse_state(const etcd_kv_t & kv);
void parse_config(const json11::Json & config);
void parse_config(json11::Json & config);
inode_watch_t* watch_inode(std::string name);
void close_watch(inode_watch_t* watch);
~etcd_state_client_t();

View File

@@ -24,6 +24,7 @@
#include <netinet/tcp.h>
#include <vector>
#include <unordered_map>
#include "epoll_manager.h"
#include "cluster_client.h"
@@ -45,7 +46,6 @@ struct sec_data
struct sec_options
{
int __pad;
char *config_path = NULL;
char *etcd_host = NULL;
char *etcd_prefix = NULL;
char *image = NULL;
@@ -54,22 +54,12 @@ struct sec_options
int cluster_log = 0;
int trace = 0;
int use_rdma = 0;
char *rdma_device = NULL;
int rdma_port_num = 0;
int rdma_gid_index = 0;
int rdma_mtu = 0;
};
static struct fio_option options[] = {
{
.name = "conf",
.lname = "Vitastor config path",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct sec_options, config_path),
.help = "Vitastor config path",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "etcd",
.lname = "etcd address",
@@ -141,25 +131,6 @@ static struct fio_option options[] = {
.type = FIO_OPT_BOOL,
.off1 = offsetof(struct sec_options, use_rdma),
.help = "Use RDMA",
.def = "-1",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "rdma_device",
.lname = "RDMA device name",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct sec_options, rdma_device),
.help = "RDMA device name",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "rdma_port_num",
.lname = "RDMA port number",
.type = FIO_OPT_INT,
.off1 = offsetof(struct sec_options, rdma_port_num),
.help = "RDMA port number",
.def = "0",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
@@ -174,16 +145,6 @@ static struct fio_option options[] = {
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "rdma_mtu",
.lname = "RDMA path MTU",
.type = FIO_OPT_INT,
.off1 = offsetof(struct sec_options, rdma_mtu),
.help = "RDMA path MTU",
.def = "0",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = NULL,
},
@@ -194,6 +155,12 @@ static int sec_setup(struct thread_data *td)
sec_options *o = (sec_options*)td->eo;
sec_data *bsd;
if (!o->etcd_host)
{
td_verror(td, EINVAL, "etcd address is missing");
return 1;
}
bsd = new sec_data;
if (!bsd)
{
@@ -209,26 +176,13 @@ static int sec_setup(struct thread_data *td)
td->o.open_files++;
}
json11::Json::object cfg;
if (o->config_path)
cfg["config_path"] = std::string(o->config_path);
if (o->etcd_host)
cfg["etcd_address"] = std::string(o->etcd_host);
if (o->etcd_prefix)
cfg["etcd_prefix"] = std::string(o->etcd_prefix);
if (o->rdma_device)
cfg["rdma_device"] = std::string(o->rdma_device);
if (o->rdma_port_num)
cfg["rdma_port_num"] = o->rdma_port_num;
if (o->rdma_gid_index)
cfg["rdma_gid_index"] = o->rdma_gid_index;
if (o->rdma_mtu)
cfg["rdma_mtu"] = o->rdma_mtu;
if (o->cluster_log)
cfg["log_level"] = o->cluster_log;
if (o->use_rdma != -1)
cfg["use_rdma"] = o->use_rdma;
json11::Json cfg_json(cfg);
json11::Json cfg = json11::Json::object {
{ "etcd_address", std::string(o->etcd_host) },
{ "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/vitastor") },
{ "log_level", o->cluster_log },
{ "use_rdma", o->use_rdma },
{ "rdma_gid_index", o->rdma_gid_index },
};
if (!o->image)
{
@@ -253,7 +207,7 @@ static int sec_setup(struct thread_data *td)
}
bsd->ringloop = new ring_loop_t(512);
bsd->epmgr = new epoll_manager_t(bsd->ringloop);
bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg_json);
bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg);
if (o->image)
{
while (!bsd->cli->is_ready())

View File

@@ -131,46 +131,39 @@ void osd_messenger_t::parse_config(const json11::Json & config)
{
#ifdef WITH_RDMA
if (!config["use_rdma"].is_null())
{
// RDMA is on by default in RDMA-enabled builds
this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0;
}
this->rdma_device = config["rdma_device"].string_value();
this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value();
if (!this->rdma_port_num)
this->rdma_port_num = 1;
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
this->rdma_max_sge = config["rdma_max_sge"].uint64_value();
if (!this->rdma_max_sge)
this->rdma_max_sge = 128;
this->rdma_max_send = config["rdma_max_send"].uint64_value();
if (!this->rdma_max_send)
this->rdma_max_send = 32;
this->rdma_max_recv = config["rdma_max_recv"].uint64_value();
if (!this->rdma_max_recv)
this->rdma_max_recv = 8;
this->rdma_max_msg = config["rdma_max_msg"].uint64_value();
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
this->rdma_max_msg = 1024*1024;
#endif
this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value();
if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024)
this->receive_buffer_size = 65536;
this->bs_bitmap_granularity = strtoull(config["bitmap_granularity"].string_value().c_str(), NULL, 10);
if (!this->bs_bitmap_granularity)
this->bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
config["use_sync_send_recv"].uint64_value();
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
if (!this->peer_connect_interval)
this->peer_connect_interval = 5;
{
this->peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
}
this->peer_connect_timeout = config["peer_connect_timeout"].uint64_value();
if (!this->peer_connect_timeout)
this->peer_connect_timeout = 5;
{
this->peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
}
this->osd_idle_timeout = config["osd_idle_timeout"].uint64_value();
if (!this->osd_idle_timeout)
this->osd_idle_timeout = 5;
{
this->osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
}
this->osd_ping_timeout = config["osd_ping_timeout"].uint64_value();
if (!this->osd_ping_timeout)
this->osd_ping_timeout = 5;
{
this->osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
}
this->log_level = config["log_level"].uint64_value();
}
@@ -380,12 +373,25 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
#ifdef WITH_RDMA
if (rdma_context)
{
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
if (cl->rdma_conn)
for (int i = 0; i < rdma_queues_per_connection; i++)
{
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge);
if (!rdma_conn)
{
break;
}
cl->rdma_queues.push_back(rdma_conn);
}
if (cl->rdma_queues.size())
{
json11::Json::array addresses;
for (auto rdma_conn: cl->rdma_queues)
{
addresses.push_back(rdma_conn->addr.to_string());
}
json11::Json payload = json11::Json::object {
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
{ "rdma_max_msg", cl->rdma_conn->max_msg },
{ "rdma_queues", addresses },
{ "rdma_max_sge", rdma_max_sge },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
@@ -437,41 +443,11 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
return;
}
#ifdef WITH_RDMA
if (config["rdma_address"].is_string())
if (!connect_rdma_server(cl, config["rdma_queues"], config["rdma_max_sge"].uint64_value()))
{
msgr_rdma_address_t addr;
if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) ||
cl->rdma_conn->connect(&addr) != 0)
{
printf(
"Failed to connect to OSD %lu (address %s) using RDMA\n",
cl->osd_num, config["rdma_address"].string_value().c_str()
);
delete cl->rdma_conn;
cl->rdma_conn = NULL;
// FIXME: Keep TCP connection in this case
osd_num_t peer_osd = cl->osd_num;
stop_client(cl->peer_fd);
on_connect_peer(peer_osd, -1);
delete op;
return;
}
else
{
uint64_t server_max_msg = config["rdma_max_msg"].uint64_value();
if (cl->rdma_conn->max_msg > server_max_msg)
{
cl->rdma_conn->max_msg = server_max_msg;
}
if (log_level > 0)
{
printf("Connected to OSD %lu using RDMA\n", cl->osd_num);
}
cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, NULL);
// Add the initial receive request
try_recv_rdma(cl);
}
// FIXME: Keep TCP connection in this case
delete op;
return;
}
#endif
osd_peer_fds[cl->osd_num] = cl->peer_fd;
@@ -520,52 +496,3 @@ bool osd_messenger_t::is_rdma_enabled()
{
return rdma_context != NULL;
}
json11::Json osd_messenger_t::read_config(const json11::Json & config)
{
const char *config_path = config["config_path"].string_value() != ""
? config["config_path"].string_value().c_str() : VITASTOR_CONFIG_PATH;
int fd = open(config_path, O_RDONLY);
if (fd < 0)
{
if (errno != ENOENT)
fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno));
return config;
}
struct stat st;
if (fstat(fd, &st) != 0)
{
fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno));
close(fd);
return config;
}
std::string buf;
buf.resize(st.st_size);
int done = 0;
while (done < st.st_size)
{
int r = read(fd, (void*)buf.data()+done, st.st_size-done);
if (r < 0)
{
fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno));
close(fd);
return config;
}
done += r;
}
close(fd);
std::string json_err;
json11::Json::object file_config = json11::Json::parse(buf, json_err).object_items();
if (json_err != "")
{
fprintf(stderr, "Invalid JSON in %s: %s\n", config_path, json_err.c_str());
return config;
}
file_config.erase("config_path");
file_config.erase("osd_num");
for (auto kv: config.object_items())
{
file_config[kv.first] = kv.second;
}
return file_config;
}

View File

@@ -33,11 +33,14 @@
#define PEER_RDMA 4
#define PEER_STOPPED 5
#define DEFAULT_PEER_CONNECT_INTERVAL 5
#define DEFAULT_PEER_CONNECT_TIMEOUT 5
#define DEFAULT_OSD_PING_TIMEOUT 5
#define DEFAULT_BITMAP_GRANULARITY 4096
#define VITASTOR_CONFIG_PATH "/etc/vitastor/vitastor.conf"
#define MSGR_SENDP_HDR 1
#define MSGR_SENDP_FREE 2
#define MSGR_SENDP_BMP 4
struct msgr_sendp_t
{
@@ -61,7 +64,7 @@ struct osd_client_t
void *in_buf = NULL;
#ifdef WITH_RDMA
msgr_rdma_connection_t *rdma_conn = NULL;
std::vector<msgr_rdma_connection_t*> rdma_queues;
#endif
// Read state
@@ -120,11 +123,13 @@ struct osd_messenger_t
protected:
int keepalive_timer_id = -1;
uint32_t receive_buffer_size = 0;
int peer_connect_interval = 0;
int peer_connect_timeout = 0;
int osd_idle_timeout = 0;
int osd_ping_timeout = 0;
// FIXME: make receive_buffer_size configurable
int receive_buffer_size = 64*1024;
int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
uint32_t bs_bitmap_granularity = 0;
int log_level = 0;
bool use_sync_send_recv = false;
@@ -133,8 +138,8 @@ protected:
std::string rdma_device;
uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0;
msgr_rdma_context_t *rdma_context = NULL;
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 8;
uint64_t rdma_max_msg = 0;
int rdma_queues_per_connection = 128;
int rdma_max_sge = 128, rdma_max_send = 32, rdma_max_recv = 32;
#endif
std::vector<int> read_ready_clients;
@@ -165,11 +170,10 @@ public:
void accept_connections(int listen_fd);
~osd_messenger_t();
static json11::Json read_config(const json11::Json & config);
#ifdef WITH_RDMA
bool is_rdma_enabled();
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
bool connect_rdma_client(osd_client_t *cl, json11::Json rdma_addresses, uint64_t client_max_sge);
int get_rdma_max_sge();
#endif
protected:
@@ -187,15 +191,15 @@ protected:
void handle_send(int result, osd_client_t *cl);
bool handle_read(int result, osd_client_t *cl);
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
bool handle_finished_read(osd_client_t *cl);
void handle_op_hdr(osd_client_t *cl);
bool handle_reply_hdr(osd_client_t *cl);
void handle_reply_ready(osd_op_t *op);
#ifdef WITH_RDMA
bool try_send_rdma(osd_client_t *cl);
bool try_recv_rdma(osd_client_t *cl);
void try_send_rdma(osd_client_t *cl);
void try_recv_rdma(osd_client_t *cl, msgr_rdma_connection_t *rc);
void handle_rdma_events();
bool connect_rdma_server(osd_client_t *cl, json11::Json rdma_addresses, uint64_t server_max_sge);
#endif
};

View File

@@ -42,8 +42,3 @@ void osd_messenger_t::read_requests()
void osd_messenger_t::send_replies()
{
}
json11::Json osd_messenger_t::read_config(const json11::Json & config)
{
return config;
}

View File

@@ -1,6 +1,3 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <stdio.h>
#include <stdlib.h>
#include "msgr_rdma.h"
@@ -166,8 +163,7 @@ cleanup:
return NULL;
}
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send,
uint32_t max_recv, uint32_t max_sge, uint32_t max_msg)
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge)
{
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
@@ -177,7 +173,6 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
conn->max_send = max_send;
conn->max_recv = max_recv;
conn->max_sge = max_sge;
conn->max_msg = max_msg;
ctx->used_max_cqe += max_send+max_recv;
if (ctx->used_max_cqe > ctx->max_cqe)
@@ -298,139 +293,500 @@ int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest)
return 0;
}
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg)
// Being the client, connect all server's RDMA queues to our local (client) queues
bool osd_messenger_t::connect_rdma_server(osd_client_t *cl, json11::Json rdma_addresses, uint64_t server_max_sge)
{
// Try to connect to the peer using RDMA
msgr_rdma_address_t addr;
if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr))
if (rdma_addresses.array_items().size() > 0)
{
if (client_max_msg > rdma_max_msg)
if (!server_max_sge || server_max_sge > rdma_max_sge)
{
client_max_msg = rdma_max_msg;
server_max_sge = rdma_max_sge;
}
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
if (rdma_conn)
int n_conn = rdma_addresses.array_items().size();
if (n_conn < cl->rdma_queues.size())
{
int r = rdma_conn->connect(&addr);
if (r != 0)
for (int i = n_conn; i < cl->rdma_queues.size(); i++)
{
delete cl->rdma_queues[i];
}
cl->rdma_queues.resize(n_conn);
}
else if (n_conn > cl->rdma_queues.size())
{
n_conn = cl->rdma_queues.size();
}
for (int i = 0; i < n_conn; i++)
{
msgr_rdma_address_t addr;
if (!msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr) ||
cl->rdma_queues[i]->connect(&addr) != 0)
{
delete rdma_conn;
printf(
"Failed to connect RDMA queue pair to %s (client %d)\n",
addr.to_string().c_str(), peer_fd
"Failed to connect to OSD %lu (address %s) using RDMA\n",
cl->osd_num, rdma_addresses[i].string_value().c_str()
);
// FIXME: Keep TCP connection in this case
osd_num_t peer_osd = cl->osd_num;
stop_client(cl->peer_fd);
on_connect_peer(peer_osd, -1);
return false;
}
else
{
// Remember connection, but switch to RDMA only after sending the configuration response
auto cl = clients.at(peer_fd);
cl->rdma_conn = rdma_conn;
cl->peer_state = PEER_RDMA_CONNECTING;
return true;
printf("Connected local queue %d to OSD %lu queue %d using RDMA\n", cl->rdma_queues[i]->qp->qp_num, cl->osd_num, addr.qpn);
if (cl->rdma_queues[i]->max_sge > server_max_sge)
{
cl->rdma_queues[i]->max_sge = server_max_sge;
}
}
}
cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, NULL);
}
return false;
else
{
for (auto rdma_conn: cl->rdma_queues)
{
delete rdma_conn;
}
cl->rdma_queues.resize(0);
}
return true;
}
static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
// Being the server, try to connect all client's RDMA queues to our local (server) queues
bool osd_messenger_t::connect_rdma_client(osd_client_t *cl, json11::Json rdma_addresses, uint64_t client_max_sge)
{
if (rdma_addresses.array_items().size() > 0)
{
if (!client_max_sge || client_max_sge > rdma_max_sge)
{
client_max_sge = rdma_max_sge;
}
int n_conn = rdma_addresses.array_items().size();
if (n_conn > rdma_queues_per_connection)
{
n_conn = rdma_queues_per_connection;
}
for (int i = 0; i < n_conn; i++)
{
msgr_rdma_address_t addr;
if (msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr))
{
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, client_max_sge);
if (rdma_conn && rdma_conn->connect(&addr) == 0)
{
printf("Connected local queue %d to client %d queue %d using RDMA\n", rdma_conn->qp->qp_num, cl->peer_fd, addr.qpn);
cl->rdma_queues.push_back(rdma_conn);
}
else
{
if (rdma_conn)
{
delete rdma_conn;
}
printf(
"Failed to connect RDMA queue pair to %s (client %d queue %d)\n",
addr.to_string().c_str(), cl->peer_fd, i+1
);
// Delete all RDMA queues to keep the TCP connection
for (int j = 0; j < cl->rdma_queues.size(); j++)
{
delete cl->rdma_queues[j];
}
cl->rdma_queues.resize(0);
return false;
}
}
}
// Switch to RDMA state only after sending the configuration response
cl->peer_state = PEER_RDMA_CONNECTING;
for (int i = 0; i < cl->rdma_queues.size(); i++)
{
try_recv_rdma(cl, cl->rdma_queues[i]);
}
}
return true;
}
static void try_send_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge)
{
timespec tv;
clock_gettime(CLOCK_REALTIME, &tv);
uint64_t total = 0;
for (int i = 0; i < op_sge; i++)
total += sge[i].length;
printf("%lu.%09lu RDMA send to queue %d: %lu bytes\n", tv.tv_sec, tv.tv_nsec, rc->qp->qp_num, total);
ibv_send_wr *bad_wr = NULL;
ibv_send_wr wr = {
.wr_id = (uint64_t)(cl->peer_fd*2+1),
.wr_id = wr_id,
.sg_list = sge,
.num_sge = op_sge,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
};
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
int err = ibv_post_send(rc->qp, &wr, &bad_wr);
if (err || bad_wr)
{
printf("RDMA send failed: %s\n", strerror(err));
exit(1);
}
cl->rdma_conn->cur_send++;
rc->cur_send++;
}
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
{
auto rc = cl->rdma_conn;
if (!cl->send_list.size() || rc->cur_send > 0)
{
// Only send one batch at a time
return true;
}
uint64_t op_size = 0, op_sge = 0;
ibv_sge sge[rc->max_sge];
while (rc->send_pos < cl->send_list.size())
{
iovec & iov = cl->send_list[rc->send_pos];
if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
{
try_send_rdma_wr(cl, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_send >= rc->max_send)
{
break;
}
}
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size);
sge[op_sge++] = {
.addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos),
.length = len,
.lkey = rc->ctx->mr->lkey,
};
op_size += len;
rc->send_buf_pos += len;
if (rc->send_buf_pos >= iov.iov_len)
{
rc->send_pos++;
rc->send_buf_pos = 0;
}
}
if (op_sge > 0)
{
try_send_rdma_wr(cl, sge, op_sge);
}
return true;
}
static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
static void try_recv_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge)
{
timespec tv;
clock_gettime(CLOCK_REALTIME, &tv);
uint64_t total = 0;
for (int i = 0; i < op_sge; i++)
total += sge[i].length;
printf("%lu.%09lu RDMA receive from queue %d: %lu bytes\n", tv.tv_sec, tv.tv_nsec, rc->qp->qp_num, total);
ibv_recv_wr *bad_wr = NULL;
ibv_recv_wr wr = {
.wr_id = (uint64_t)(cl->peer_fd*2),
.wr_id = wr_id,
.sg_list = sge,
.num_sge = op_sge,
};
int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr);
int err = ibv_post_recv(rc->qp, &wr, &bad_wr);
if (err || bad_wr)
{
printf("RDMA receive failed: %s\n", strerror(err));
exit(1);
}
cl->rdma_conn->cur_recv++;
rc->cur_recv++;
}
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
static bool try_recv_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, uint32_t bs_bitmap_granularity)
{
auto rc = cl->rdma_conn;
while (rc->cur_recv < rc->max_recv)
int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity;
iovec *segments = cur_op->iov.get_iovec();
ibv_sge sge[rc->max_sge];
sge[0] = {
.addr = (uintptr_t)cur_op->reply.buf,
.length = (uint32_t)OSD_PACKET_SIZE,
.lkey = rc->ctx->mr->lkey,
};
while (rc->recv_pos < cur_op->iov.get_size())
{
void *buf = malloc_or_die(rc->max_msg);
rc->recv_buffers.push_back(buf);
ibv_sge sge = {
.addr = (uintptr_t)buf,
.length = (uint32_t)rc->max_msg,
iovec & iov = segments[rc->recv_pos];
if (op_size >= op_max || op_sge >= rc->max_sge)
{
try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_recv >= rc->max_recv)
{
// FIXME
exit(1);
}
}
// Receive in (max_sge*4k) fragments
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max
? iov.iov_len-rc->recv_buf_pos : op_max-op_size);
sge[op_sge++] = {
.addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos),
.length = len,
.lkey = rc->ctx->mr->lkey,
};
try_recv_rdma_wr(cl, &sge, 1);
op_size += len;
rc->recv_buf_pos += len;
if (rc->recv_buf_pos >= iov.iov_len)
{
rc->recv_pos++;
rc->recv_buf_pos = 0;
}
}
if (op_sge > 0)
{
try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge);
}
rc->recv_pos = 0;
rc->recv_buf_pos = 0;
return true;
}
static bool try_send_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, int op_list_size, uint32_t bs_bitmap_granularity)
{
ibv_sge sge[rc->max_sge];
int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity;
sge[0] = {
.addr = (uintptr_t)cl->send_list[0].iov_base,
.length = (uint32_t)cl->send_list[0].iov_len,
.lkey = rc->ctx->mr->lkey,
};
rc->send_pos = 1;
while (rc->send_pos < op_list_size)
{
iovec & iov = cl->send_list[rc->send_pos];
if (cl->outbox[rc->send_pos].flags & MSGR_SENDP_HDR)
{
if (op_sge > 0)
{
try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_send >= rc->max_send)
break;
}
assert(rc->send_buf_pos == 0);
sge[0] = {
.addr = (uintptr_t)iov.iov_base,
.length = (uint32_t)iov.iov_len,
.lkey = rc->ctx->mr->lkey,
};
try_send_rdma_wr(rc, cl->peer_fd, sge, 1);
rc->send_pos++;
if (rc->cur_send >= rc->max_send)
break;
}
else
{
if (op_size >= op_max || op_sge >= rc->max_sge)
{
try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_send >= rc->max_send)
break;
}
// Fragment all messages into parts no longer than (max_sge*4k) = 120k on ConnectX-4
// Otherwise the client may not be able to receive them in small parts
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < op_max ? iov.iov_len-rc->send_buf_pos : op_max-op_size);
sge[op_sge++] = {
.addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos),
.length = len,
.lkey = rc->ctx->mr->lkey,
};
op_size += len;
rc->send_buf_pos += len;
if (rc->send_buf_pos >= iov.iov_len)
{
rc->send_pos++;
rc->send_buf_pos = 0;
}
}
}
if (op_sge > 0)
{
try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge);
}
if (op_list_size == 1)
{
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cur_op->req.hdr.opcode == OSD_OP_READ)
{
sge[0] = {
.addr = 0,
.length = 0,
.lkey = rc->ctx->mr->lkey,
};
uint64_t data_size = cur_op->req.hdr.opcode == OSD_OP_SEC_READ
? cur_op->req.sec_rw.len
: cur_op->req.rw.len;
while (data_size >= op_max)
{
try_send_rdma_wr(rc, cl->peer_fd, sge, 1);
data_size -= op_max;
}
if (data_size > 0)
try_send_rdma_wr(rc, cl->peer_fd, sge, 1);
}
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
sge[0] = {
.addr = 0,
.length = 0,
.lkey = rc->ctx->mr->lkey,
};
try_send_rdma_wr(rc, cl->peer_fd, sge, 1);
}
else
return true;
}
return true;
}
void osd_messenger_t::try_send_rdma(osd_client_t *cl)
{
// Two different algorithms for outgoing and incoming operations
while (cl->outbox.size() > 0)
{
osd_op_t *cur_op = cl->outbox[0].op;
if (cur_op->op_type == OSD_OP_OUT)
{
// Pick a queue. Send operation to it in one part.
int qi;
for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != NULL; qi++) {}
if (qi >= cl->rdma_queues.size())
{
// No free queues, retry later.
// We only post 1 operation per queue to use the queue pair number as a 'tag'.
return;
}
// Pick all entries for the operation from the queue
int op_list_size = 0;
while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op)
{
op_list_size++;
}
auto rq = cl->rdma_queues[qi];
rq->cur_op = cur_op;
ibv_sge sge[rq->max_sge];
// FIXME: This won't work with long bitmaps. But I don't care, I want to finally test fucking RDMA
// header or header+data
sge[0] = {
.addr = (uintptr_t)cl->send_list[0].iov_base,
.length = (uint32_t)cl->send_list[0].iov_len,
.lkey = rq->ctx->mr->lkey,
};
if (op_list_size == 2)
{
auto & iov = cl->send_list[1];
sge[1] = {
.addr = (uintptr_t)iov.iov_base,
.length = (uint32_t)iov.iov_len,
.lkey = rq->ctx->mr->lkey,
};
try_send_rdma_wr(rq, cl->peer_fd, sge, 2);
}
else if (op_list_size == 1)
{
try_send_rdma_wr(rq, cl->peer_fd, sge, 1);
}
else
{
printf("unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size());
exit(1);
}
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size);
// Post a receive request for the reply at the same time
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cur_op->req.hdr.opcode == OSD_OP_READ)
{
try_recv_rdma_read(cl, rq, cur_op, bs_bitmap_granularity);
}
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
assert(!cur_op->iov.count);
// FIXME: hardcode
#define clean_entry_bitmap_size 4
// Reply size is known
uint64_t data_size = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id) * (8 + clean_entry_bitmap_size);
cur_op->rmw_buf = malloc_or_die(data_size);
sge[0] = {
.addr = (uintptr_t)cur_op->reply.buf,
.length = (uint32_t)OSD_PACKET_SIZE,
.lkey = rq->ctx->mr->lkey,
};
sge[1] = {
.addr = (uintptr_t)cur_op->rmw_buf,
.length = (uint32_t)data_size,
.lkey = rq->ctx->mr->lkey,
};
try_recv_rdma_wr(rq, cl->peer_fd, sge, 2);
}
else
{
// No reply or reply size is unknown
sge[0] = {
.addr = (uintptr_t)cur_op->reply.buf,
.length = (uint32_t)OSD_PACKET_SIZE,
.lkey = rq->ctx->mr->lkey,
};
try_recv_rdma_wr(rq, cl->peer_fd, sge, 1);
}
}
else
{
// Send reply to the same queue the operation came from.
// Fragment it into parts no longer than (max_sge*4k) to always
// be able to send and receive them correctly.
int qi;
for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != cur_op; qi++) {}
if (qi >= cl->rdma_queues.size())
{
printf("Unknown incoming operation for client %d\n", cl->peer_fd);
exit(1);
}
// Pick all entries for the operation from the queue
int op_list_size = 0;
while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op)
{
op_list_size++;
}
auto rq = cl->rdma_queues[qi];
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ||
cur_op->req.hdr.opcode == OSD_OP_READ)
{
try_send_rdma_read(cl, rq, cur_op, op_list_size, bs_bitmap_granularity);
rq->send_pos = 0;
rq->send_buf_pos = 0;
}
else if (op_list_size == 1)
{
ibv_sge sge[1];
sge[0] = {
.addr = (uintptr_t)cl->send_list[0].iov_base,
.length = (uint32_t)cl->send_list[0].iov_len,
.lkey = rq->ctx->mr->lkey,
};
try_send_rdma_wr(rq, cl->peer_fd, sge, 1);
}
else if (op_list_size == 2)
{
ibv_sge sge[2];
sge[0] = {
.addr = (uintptr_t)cl->send_list[0].iov_base,
.length = (uint32_t)cl->send_list[0].iov_len,
.lkey = rq->ctx->mr->lkey,
};
sge[1] = {
.addr = (uintptr_t)cl->send_list[1].iov_base,
.length = (uint32_t)cl->send_list[1].iov_len,
.lkey = rq->ctx->mr->lkey,
};
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
try_send_rdma_wr(rq, cl->peer_fd, sge, 2);
else
{
try_send_rdma_wr(rq, cl->peer_fd, sge, 1);
try_send_rdma_wr(rq, cl->peer_fd, sge+1, 1);
}
}
else if (op_list_size > 2)
{
printf("Unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size());
exit(1);
}
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size);
}
}
}
// Try to receive an incoming operation via RDMA
void osd_messenger_t::try_recv_rdma(osd_client_t *cl, msgr_rdma_connection_t *rc)
{
rc->cur_op = new osd_op_t;
rc->cur_op->peer_fd = cl->peer_fd;
rc->cur_op->op_type = OSD_OP_IN;
rc->cur_op->buf = memalign_or_die(MEM_ALIGNMENT, 128*1024); // FIXME hardcode for tests
ibv_sge sge[2];
sge[0] = {
.addr = (uintptr_t)rc->cur_op->req.buf,
.length = (uint32_t)OSD_PACKET_SIZE,
.lkey = rc->ctx->mr->lkey,
};
sge[1] = {
.addr = (uintptr_t)rc->cur_op->buf,
.length = (uint32_t)128*1024,
.lkey = rc->ctx->mr->lkey,
};
try_recv_rdma_wr(rc, cl->peer_fd, sge, 2);
}
#define RDMA_EVENTS_AT_ONCE 32
void osd_messenger_t::handle_rdma_events()
@@ -439,6 +795,7 @@ void osd_messenger_t::handle_rdma_events()
ibv_cq *ev_cq;
void *ev_ctx;
// FIXME: This is inefficient as it calls read()...
timespec tv;
if (ibv_get_cq_event(rdma_context->channel, &ev_cq, &ev_ctx) == 0)
{
ibv_ack_cq_events(rdma_context->cq, 1);
@@ -455,8 +812,8 @@ void osd_messenger_t::handle_rdma_events()
event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc);
for (int i = 0; i < event_count; i++)
{
int client_id = wc[i].wr_id >> 1;
bool is_send = wc[i].wr_id & 1;
int client_id = wc[i].wr_id;
bool is_send = wc[i].opcode == IBV_WC_SEND;
auto cl_it = clients.find(client_id);
if (cl_it == clients.end())
{
@@ -474,41 +831,120 @@ void osd_messenger_t::handle_rdma_events()
stop_client(client_id);
continue;
}
if (!is_send)
int q;
for (q = 0; q < cl->rdma_queues.size() && cl->rdma_queues[q]->qp->qp_num != wc[i].qp_num; q++) {}
if (q >= cl->rdma_queues.size())
{
cl->rdma_conn->cur_recv--;
handle_read_buffer(cl, cl->rdma_conn->recv_buffers[0], wc[i].byte_len);
free(cl->rdma_conn->recv_buffers[0]);
cl->rdma_conn->recv_buffers.erase(cl->rdma_conn->recv_buffers.begin(), cl->rdma_conn->recv_buffers.begin()+1);
try_recv_rdma(cl);
printf("Unknown queue %d for client %d\n", wc[i].qp_num, cl->peer_fd);
exit(1);
}
auto rc = cl->rdma_queues[q];
if (is_send)
{
clock_gettime(CLOCK_REALTIME, &tv);
printf("%lu.%09lu Done RDMA send on queue %d\n", tv.tv_sec, tv.tv_nsec, wc[i].qp_num);
}
else
{
cl->rdma_conn->cur_send--;
if (!cl->rdma_conn->cur_send)
clock_gettime(CLOCK_REALTIME, &tv);
printf("%lu.%09lu Done RDMA recv on queue %d, %d bytes\n", tv.tv_sec, tv.tv_nsec, wc[i].qp_num, wc[i].byte_len);
}
if (!is_send)
{
rc->cur_recv--;
if (!rc->cur_recv)
{
// Wait for the whole batch
for (int i = 0; i < cl->rdma_conn->send_pos; i++)
// Fucking shit...
if (rc->cur_op->op_type == OSD_OP_IN)
{
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
if (wc[i].byte_len <= OSD_PACKET_SIZE)
{
// Reply fully sent
delete cl->outbox[i].op;
free(rc->cur_op->buf);
rc->cur_op->buf = NULL;
}
cl->received_ops.push_back(rc->cur_op);
set_immediate.push_back([this, op = rc->cur_op]() { exec_op(op); });
}
else /* if (rc->cur_op->op_type == OSD_OP_OUT) */
{
if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ ||
rc->cur_op->reply.hdr.opcode == OSD_OP_READ)
{
// Data is already received
cl->sent_ops.erase(rc->cur_op->req.hdr.id);
handle_reply_ready(rc->cur_op);
rc->cur_op = NULL;
try_send_rdma(cl);
}
else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
// Data is already received, but we need to switch buffers
cl->sent_ops.erase(rc->cur_op->req.hdr.id);
free(rc->cur_op->buf);
rc->cur_op->buf = rc->cur_op->rmw_buf;
handle_reply_ready(rc->cur_op);
rc->cur_op = NULL;
try_send_rdma(cl);
}
else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST && rc->cur_op->reply.hdr.retval > 0 ||
rc->cur_op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && rc->cur_op->reply.hdr.retval > 0)
{
if (rc->recv_pos != 1)
{
// Data is not received yet (RNR)
uint32_t len;
if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST)
len = sizeof(obj_ver_id) * rc->cur_op->reply.hdr.retval;
else
len = rc->cur_op->reply.hdr.retval;
rc->cur_op->buf = malloc_or_die(len);
ibv_sge sge[1];
sge[0] = {
.addr = (uintptr_t)rc->cur_op->buf,
.length = len,
.lkey = rc->ctx->mr->lkey,
};
try_recv_rdma_wr(rc, cl->peer_fd, sge, 1);
rc->recv_pos = 1;
}
else
{
// Done
cl->sent_ops.erase(rc->cur_op->req.hdr.id);
handle_reply_ready(rc->cur_op);
rc->cur_op = NULL;
rc->recv_pos = 0;
try_send_rdma(cl);
}
}
else
{
// No data
cl->sent_ops.erase(rc->cur_op->req.hdr.id);
handle_reply_ready(rc->cur_op);
rc->cur_op = NULL;
try_send_rdma(cl);
}
}
if (cl->rdma_conn->send_pos > 0)
}
}
else
{
rc->cur_send--;
if (!rc->cur_send)
{
if (rc->cur_op->op_type == OSD_OP_OUT)
{
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos);
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos);
cl->rdma_conn->send_pos = 0;
// Nothing
}
if (cl->rdma_conn->send_buf_pos > 0)
else /* if (rc->cur_op->op_type == OSD_OP_IN) */
{
cl->send_list[0].iov_base += cl->rdma_conn->send_buf_pos;
cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos;
cl->rdma_conn->send_buf_pos = 0;
// Reply fully sent
delete rc->cur_op;
rc->cur_op = NULL;
// Post receive for the next incoming op
try_recv_rdma(cl, rc);
}
try_send_rdma(cl);
}
}
}
@@ -519,3 +955,8 @@ void osd_messenger_t::handle_rdma_events()
}
set_immediate.clear();
}
int osd_messenger_t::get_rdma_max_sge()
{
return rdma_max_sge;
}

View File

@@ -1,11 +1,10 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#include <infiniband/verbs.h>
#include <string>
#include <vector>
struct osd_op_t;
struct msgr_rdma_address_t
{
ibv_gid gid;
@@ -46,13 +45,12 @@ struct msgr_rdma_connection_t
msgr_rdma_address_t addr;
int max_send = 0, max_recv = 0, max_sge = 0;
int cur_send = 0, cur_recv = 0;
uint64_t max_msg = 0;
osd_op_t *cur_op = NULL;
int send_pos = 0, send_buf_pos = 0;
int recv_pos = 0, recv_buf_pos = 0;
std::vector<void*> recv_buffers;
~msgr_rdma_connection_t();
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge);
int connect(msgr_rdma_address_t *dest);
};

View File

@@ -91,9 +91,48 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
{
if (cl->read_iov.iov_base == cl->in_buf)
{
if (!handle_read_buffer(cl, cl->in_buf, result))
// Compose operation(s) from the buffer
int remain = result;
void *curbuf = cl->in_buf;
while (remain > 0)
{
goto fin;
if (!cl->read_op)
{
cl->read_op = new osd_op_t;
cl->read_op->peer_fd = cl->peer_fd;
cl->read_op->op_type = OSD_OP_IN;
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR;
}
while (cl->recv_list.done < cl->recv_list.count && remain > 0)
{
iovec* cur = cl->recv_list.get_iovec();
if (cur->iov_len > remain)
{
memcpy(cur->iov_base, curbuf, remain);
cl->read_remaining -= remain;
cur->iov_len -= remain;
cur->iov_base += remain;
remain = 0;
}
else
{
memcpy(cur->iov_base, curbuf, cur->iov_len);
curbuf += cur->iov_len;
cl->read_remaining -= cur->iov_len;
remain -= cur->iov_len;
cur->iov_len = 0;
cl->recv_list.done++;
}
}
if (cl->recv_list.done >= cl->recv_list.count)
{
if (!handle_finished_read(cl))
{
goto fin;
}
}
}
}
else
@@ -120,52 +159,6 @@ fin:
return ret;
}
bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain)
{
// Compose operation(s) from the buffer
while (remain > 0)
{
if (!cl->read_op)
{
cl->read_op = new osd_op_t;
cl->read_op->peer_fd = cl->peer_fd;
cl->read_op->op_type = OSD_OP_IN;
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR;
}
while (cl->recv_list.done < cl->recv_list.count && remain > 0)
{
iovec* cur = cl->recv_list.get_iovec();
if (cur->iov_len > remain)
{
memcpy(cur->iov_base, curbuf, remain);
cl->read_remaining -= remain;
cur->iov_len -= remain;
cur->iov_base += remain;
remain = 0;
}
else
{
memcpy(cur->iov_base, curbuf, cur->iov_len);
curbuf += cur->iov_len;
cl->read_remaining -= cur->iov_len;
remain -= cur->iov_len;
cur->iov_len = 0;
cl->recv_list.done++;
}
}
if (cl->recv_list.done >= cl->recv_list.count)
{
if (!handle_finished_read(cl))
{
return false;
}
}
}
return true;
}
bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
{
cl->recv_list.reset();
@@ -214,20 +207,26 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
{
if (cur_op->req.sec_rw.attr_len > 0)
if (cur_op->req.sec_rw.bitmap_len > 0)
{
if (cur_op->req.sec_rw.attr_len > sizeof(unsigned))
cur_op->bitmap = cur_op->rmw_buf = malloc_or_die(cur_op->req.sec_rw.attr_len);
if (cur_op->req.sec_rw.bitmap_len > sizeof(void*))
cur_op->bitmap = cur_op->rmw_buf = malloc_or_die(cur_op->req.sec_rw.bitmap_len);
else
cur_op->bitmap = &cur_op->bmp_data;
cl->recv_list.push_back(cur_op->bitmap, cur_op->req.sec_rw.attr_len);
if (cur_op->req.sec_rw.bitmap_len <= 8)
memcpy(cur_op->bitmap, &cur_op->req.sec_rw.bitmap, cur_op->req.sec_rw.bitmap_len);
else
{
cl->recv_list.push_back(cur_op->bitmap, cur_op->req.sec_rw.bitmap_len);
cl->read_remaining += cur_op->req.sec_rw.bitmap_len;
}
}
if (cur_op->req.sec_rw.len > 0)
{
cur_op->buf = memalign_or_die(MEM_ALIGNMENT, cur_op->req.sec_rw.len);
cl->recv_list.push_back(cur_op->buf, cur_op->req.sec_rw.len);
cl->read_remaining += cur_op->req.sec_rw.len;
}
cl->read_remaining = cur_op->req.sec_rw.len + cur_op->req.sec_rw.attr_len;
}
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE ||
cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)
@@ -302,7 +301,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
if (op->reply.hdr.opcode == OSD_OP_SEC_READ || op->reply.hdr.opcode == OSD_OP_READ)
{
// Read data. In this case we assume that the buffer is preallocated by the caller (!)
unsigned bmp_len = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->reply.sec_rw.attr_len : op->reply.rw.bitmap_len);
unsigned bmp_len = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->reply.sec_rw.bitmap_len : op->reply.rw.bitmap_len);
unsigned expected_size = (op->reply.hdr.opcode == OSD_OP_SEC_READ ? op->req.sec_rw.len : op->req.rw.len);
if (op->reply.hdr.retval >= 0 && (op->reply.hdr.retval != expected_size || bmp_len > op->bitmap_len))
{
@@ -316,14 +315,24 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
if (op->reply.hdr.retval >= 0 && bmp_len > 0)
{
assert(op->bitmap);
cl->recv_list.push_back(op->bitmap, bmp_len);
if (bmp_len <= 8)
{
memcpy(op->bitmap, (op->reply.hdr.opcode == OSD_OP_SEC_READ
? &op->reply.sec_rw.bitmap
: &op->reply.rw.bitmap), bmp_len);
}
else
{
cl->recv_list.push_back(op->bitmap, bmp_len);
cl->read_remaining += bmp_len;
}
}
if (op->reply.hdr.retval > 0)
{
assert(op->iov.count > 0);
cl->recv_list.append(op->iov);
cl->read_remaining += op->reply.hdr.retval;
}
cl->read_remaining = op->reply.hdr.retval + bmp_len;
if (cl->read_remaining == 0)
{
goto reuse;

View File

@@ -50,23 +50,37 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
// Bitmap
if (cur_op->op_type == OSD_OP_IN &&
cur_op->req.hdr.opcode == OSD_OP_SEC_READ &&
cur_op->reply.sec_rw.attr_len > 0)
cur_op->reply.sec_rw.bitmap_len > 0)
{
to_send_list.push_back((iovec){
.iov_base = cur_op->bitmap,
.iov_len = cur_op->reply.sec_rw.attr_len,
});
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
if (cur_op->reply.sec_rw.bitmap_len <= 8)
{
memcpy(&cur_op->reply.sec_rw.bitmap, cur_op->bitmap, cur_op->reply.sec_rw.bitmap_len);
}
else
{
to_send_list.push_back((iovec){
.iov_base = cur_op->bitmap,
.iov_len = cur_op->reply.sec_rw.bitmap_len,
});
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_BMP });
}
}
else if (cur_op->op_type == OSD_OP_OUT &&
(cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) &&
cur_op->req.sec_rw.attr_len > 0)
cur_op->req.sec_rw.bitmap_len > 0)
{
to_send_list.push_back((iovec){
.iov_base = cur_op->bitmap,
.iov_len = cur_op->req.sec_rw.attr_len,
});
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
if (cur_op->req.sec_rw.bitmap_len <= 8)
{
memcpy(&cur_op->req.sec_rw.bitmap, cur_op->bitmap, cur_op->req.sec_rw.bitmap_len);
}
else
{
to_send_list.push_back((iovec){
.iov_base = cur_op->bitmap,
.iov_len = cur_op->req.sec_rw.attr_len,
});
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_BMP });
}
}
// Operation data
if ((cur_op->op_type == OSD_OP_IN
@@ -268,18 +282,13 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
}
cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
#ifdef WITH_RDMA
if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING)
if (cl->peer_state == PEER_RDMA_CONNECTING && cl->rdma_queues.size() > 0 && !cl->outbox.size())
{
// FIXME: Do something better than just forgetting the FD
// FIXME: Ignore pings during RDMA state transition
if (log_level > 0)
{
printf("Successfully connected with client %d using RDMA\n", cl->peer_fd);
}
printf("Successfully connected with client %d using RDMA\n", cl->peer_fd);
cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, NULL);
// Add the initial receive request
try_recv_rdma(cl);
}
#endif
}

View File

@@ -123,10 +123,11 @@ void osd_messenger_t::stop_client(int peer_fd, bool force)
// ...because peer_fd number can get reused after close()
close(peer_fd);
#ifdef WITH_RDMA
if (cl->rdma_conn)
for (auto rdma_conn: cl->rdma_queues)
{
delete cl->rdma_conn;
delete rdma_conn;
}
cl->rdma_queues.resize(0);
#endif
#endif
// Find the item again because it can be invalidated at this point

View File

@@ -116,7 +116,7 @@ public:
"Vitastor NBD proxy\n"
"(c) Vitaliy Filippov, 2020-2021 (VNPL-1.1)\n\n"
"USAGE:\n"
" %s map [--etcd_address <etcd_address>] (--image <image> | --pool <pool> --inode <inode> --size <size in bytes>)\n"
" %s map --etcd_address <etcd_address> (--image <image> | --pool <pool> --inode <inode> --size <size in bytes>)\n"
" %s unmap /dev/nbd0\n"
" %s list [--json]\n",
exe_name, exe_name, exe_name
@@ -146,6 +146,11 @@ public:
void start(json11::Json cfg)
{
// Check options
if (cfg["etcd_address"].string_value() == "")
{
fprintf(stderr, "etcd_address is missing\n");
exit(1);
}
if (cfg["image"].string_value() != "")
{
// Use image name

View File

@@ -10,39 +10,31 @@
#include "osd.h"
#include "http_client.h"
static blockstore_config_t json_to_bs(const json11::Json::object & config)
osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop)
{
blockstore_config_t bs;
for (auto kv: config)
{
if (kv.second.is_string())
bs[kv.first] = kv.second.string_value();
else
bs[kv.first] = kv.second.dump();
}
return bs;
}
bs_block_size = strtoull(config["block_size"].c_str(), NULL, 10);
bs_bitmap_granularity = strtoull(config["bitmap_granularity"].c_str(), NULL, 10);
if (!bs_block_size)
bs_block_size = DEFAULT_BLOCK_SIZE;
if (!bs_bitmap_granularity)
bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
clean_entry_bitmap_size = bs_block_size / bs_bitmap_granularity / 8;
osd_t::osd_t(const json11::Json & config, ring_loop_t *ringloop)
{
zero_buffer_size = 1<<20;
zero_buffer = malloc_or_die(zero_buffer_size);
memset(zero_buffer, 0, zero_buffer_size);
this->config = config;
this->ringloop = ringloop;
this->config = msgr.read_config(config).object_items();
if (this->config.find("log_level") == this->config.end())
this->config["log_level"] = 1;
parse_config(this->config);
epmgr = new epoll_manager_t(ringloop);
// FIXME: Use timerfd_interval based directly on io_uring
this->tfd = epmgr->tfd;
// FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config
auto bs_cfg = json_to_bs(this->config);
this->bs = new blockstore_t(bs_cfg, ringloop, tfd);
this->bs = new blockstore_t(config, ringloop, tfd);
parse_config(config);
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{
@@ -74,71 +66,63 @@ osd_t::~osd_t()
free(zero_buffer);
}
void osd_t::parse_config(const json11::Json & config)
void osd_t::parse_config(blockstore_config_t & config)
{
st_cli.parse_config(config);
msgr.parse_config(config);
// OSD number
osd_num = config["osd_num"].uint64_value();
if (config.find("log_level") == config.end())
config["log_level"] = "1";
log_level = strtoull(config["log_level"].c_str(), NULL, 10);
// Initial startup configuration
json11::Json json_config = json11::Json(config);
st_cli.parse_config(json_config);
etcd_report_interval = strtoull(config["etcd_report_interval"].c_str(), NULL, 10);
if (etcd_report_interval <= 0)
etcd_report_interval = 30;
osd_num = strtoull(config["osd_num"].c_str(), NULL, 10);
if (!osd_num)
throw std::runtime_error("osd_num is required in the configuration");
msgr.osd_num = osd_num;
// Vital Blockstore parameters
bs_block_size = config["block_size"].uint64_value();
if (!bs_block_size)
bs_block_size = DEFAULT_BLOCK_SIZE;
bs_bitmap_granularity = config["bitmap_granularity"].uint64_value();
if (!bs_bitmap_granularity)
bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
clean_entry_bitmap_size = bs_block_size / bs_bitmap_granularity / 8;
// Bind address
bind_address = config["bind_address"].string_value();
if (bind_address == "")
bind_address = "0.0.0.0";
bind_port = config["bind_port"].uint64_value();
if (bind_port <= 0 || bind_port > 65535)
bind_port = 0;
// OSD configuration
log_level = config["log_level"].uint64_value();
etcd_report_interval = config["etcd_report_interval"].uint64_value();
if (etcd_report_interval <= 0)
etcd_report_interval = 30;
readonly = config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes";
run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no";
no_rebalance = config["no_rebalance"] == "true" || config["no_rebalance"] == "1" || config["no_rebalance"] == "yes";
no_recovery = config["no_recovery"] == "true" || config["no_recovery"] == "1" || config["no_recovery"] == "yes";
allow_test_ops = config["allow_test_ops"] == "true" || config["allow_test_ops"] == "1" || config["allow_test_ops"] == "yes";
// Cluster configuration
bind_address = config["bind_address"];
if (bind_address == "")
bind_address = "0.0.0.0";
bind_port = stoull_full(config["bind_port"]);
if (bind_port <= 0 || bind_port > 65535)
bind_port = 0;
if (config["immediate_commit"] == "all")
immediate_commit = IMMEDIATE_ALL;
else if (config["immediate_commit"] == "small")
immediate_commit = IMMEDIATE_SMALL;
else
immediate_commit = IMMEDIATE_NONE;
if (!config["autosync_interval"].is_null())
if (config.find("autosync_interval") != config.end())
{
// Allow to set it to 0
autosync_interval = config["autosync_interval"].uint64_value();
autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10);
if (autosync_interval > MAX_AUTOSYNC_INTERVAL)
autosync_interval = DEFAULT_AUTOSYNC_INTERVAL;
}
if (!config["client_queue_depth"].is_null())
if (config.find("client_queue_depth") != config.end())
{
client_queue_depth = config["client_queue_depth"].uint64_value();
client_queue_depth = strtoull(config["client_queue_depth"].c_str(), NULL, 10);
if (client_queue_depth < 128)
client_queue_depth = 128;
}
recovery_queue_depth = config["recovery_queue_depth"].uint64_value();
recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10);
if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
recovery_sync_batch = config["recovery_sync_batch"].uint64_value();
recovery_sync_batch = strtoull(config["recovery_sync_batch"].c_str(), NULL, 10);
if (recovery_sync_batch < 1 || recovery_sync_batch > MAX_RECOVERY_QUEUE)
recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
print_stats_interval = config["print_stats_interval"].uint64_value();
if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes")
readonly = true;
print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10);
if (!print_stats_interval)
print_stats_interval = 3;
slow_log_interval = config["slow_log_interval"].uint64_value();
slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10);
if (!slow_log_interval)
slow_log_interval = 10;
msgr.parse_config(json_config);
}
void osd_t::bind_socket()

View File

@@ -92,7 +92,7 @@ class osd_t
{
// config
json11::Json::object config;
blockstore_config_t config;
int etcd_report_interval = 30;
bool readonly = false;
@@ -167,7 +167,7 @@ class osd_t
uint64_t recovery_stat_bytes[2][2] = { 0 };
// cluster connection
void parse_config(const json11::Json & config);
void parse_config(blockstore_config_t & config);
void init_cluster();
void on_change_osd_state_hook(osd_num_t peer_osd);
void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num);
@@ -268,7 +268,7 @@ class osd_t
}
public:
osd_t(const json11::Json & config, ring_loop_t *ringloop);
osd_t(blockstore_config_t & config, ring_loop_t *ringloop);
~osd_t();
void force_stop(int exitcode);
bool shutdown();

View File

@@ -21,7 +21,7 @@ void osd_t::init_cluster()
{
// Test version of clustering code with 1 pool, 1 PG and 2 peers
// Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205
std::string peerstr = config["peers"].string_value();
std::string peerstr = config["peers"];
while (peerstr.size())
{
int pos = peerstr.find(',');
@@ -340,10 +340,21 @@ void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
void osd_t::on_load_config_hook(json11::Json::object & global_config)
{
json11::Json::object osd_config = this->config;
for (auto & kv: global_config)
if (osd_config.find(kv.first) == osd_config.end())
osd_config[kv.first] = kv.second;
blockstore_config_t osd_config = this->config;
for (auto & cfg_var: global_config)
{
if (this->config.find(cfg_var.first) == this->config.end())
{
if (cfg_var.second.is_string())
{
osd_config[cfg_var.first] = cfg_var.second.string_value();
}
else
{
osd_config[cfg_var.first] = cfg_var.second.dump();
}
}
}
parse_config(osd_config);
bind_socket();
acquire_lease();
@@ -369,7 +380,7 @@ void osd_t::acquire_lease()
etcd_lease_id = data["ID"].string_value();
create_osd_state();
});
printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, config["etcd_address"].string_value().c_str(), etcd_report_interval);
printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, config["etcd_address"].c_str(), etcd_report_interval);
tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id)
{
renew_lease();

View File

@@ -29,13 +29,13 @@ int main(int narg, char *args[])
perror("BUG: too small packet size");
return 1;
}
json11::Json::object config;
blockstore_config_t config;
for (int i = 1; i < narg; i++)
{
if (args[i][0] == '-' && args[i][1] == '-' && i < narg-1)
{
char *opt = args[i]+2;
config[std::string(opt)] = std::string(args[++i]);
config[opt] = args[++i];
}
}
signal(SIGINT, handle_sigint);

View File

@@ -35,7 +35,7 @@
#define MEM_ALIGNMENT 512
#endif
#define OSD_RW_MAX 64*1024*1024
#define OSD_PROTOCOL_VERSION 1
#define OSD_PROTOCOL_VERSION 2
// common request and reply headers
struct __attribute__((__packed__)) osd_op_header_t
@@ -74,8 +74,10 @@ struct __attribute__((__packed__)) osd_op_sec_rw_t
// length
uint32_t len;
// bitmap/attribute length - bitmap comes after header, but before data
uint32_t attr_len;
uint32_t bitmap_len;
uint32_t pad0;
// inline bitmap (when it's no longer than 8 bytes)
uint64_t bitmap;
};
struct __attribute__((__packed__)) osd_reply_sec_rw_t
@@ -84,8 +86,10 @@ struct __attribute__((__packed__)) osd_reply_sec_rw_t
// for reads and writes: assigned or read version number
uint64_t version;
// for reads: bitmap/attribute length (just to double-check)
uint32_t attr_len;
uint32_t bitmap_len;
uint32_t pad0;
// inline bitmap (when it's no longer than 8 bytes)
uint64_t bitmap;
};
// delete object on the secondary OSD
@@ -199,6 +203,8 @@ struct __attribute__((__packed__)) osd_reply_rw_t
// for reads: bitmap length
uint32_t bitmap_len;
uint32_t pad0;
// inline bitmap (when it's no longer than 8 bytes)
uint64_t bitmap;
};
// sync to the primary OSD

View File

@@ -235,7 +235,10 @@ resume_2:
{
reconstruct_stripes_jerasure(stripes, op_data->pg_size, op_data->pg_data_size, clean_entry_bitmap_size);
}
cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
if (cur_op->reply.rw.bitmap_len <= 8)
memcpy(&cur_op->reply.rw.bitmap, op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
else
cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
for (int role = 0; role < op_data->pg_size; role++)
{
if (stripes[role].req_end != 0)
@@ -250,7 +253,10 @@ resume_2:
}
else
{
cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
if (cur_op->reply.rw.bitmap_len <= 8)
memcpy(&cur_op->reply.rw.bitmap, op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
else
cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
cur_op->iov.push_back(cur_op->buf, cur_op->req.rw.len);
}
finish_op(cur_op, cur_op->req.rw.len);

View File

@@ -200,7 +200,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
.version = op_version,
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
.attr_len = wr ? clean_entry_bitmap_size : 0,
.bitmap_len = wr ? clean_entry_bitmap_size : 0,
};
#ifdef OSD_DEBUG
printf(

View File

@@ -20,9 +20,9 @@ void osd_t::secondary_op_callback(osd_op_t *op)
if (op->req.hdr.opcode == OSD_OP_SEC_READ)
{
if (op->bs_op->retval >= 0)
op->reply.sec_rw.attr_len = clean_entry_bitmap_size;
op->reply.sec_rw.bitmap_len = clean_entry_bitmap_size;
else
op->reply.sec_rw.attr_len = 0;
op->reply.sec_rw.bitmap_len = 0;
if (op->bs_op->retval > 0)
op->iov.push_back(op->buf, op->bs_op->retval);
}
@@ -81,7 +81,7 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ)
{
// Allocate memory for the read operation
if (clean_entry_bitmap_size > sizeof(unsigned))
if (clean_entry_bitmap_size > sizeof(void*))
cur_op->bitmap = cur_op->rmw_buf = malloc_or_die(clean_entry_bitmap_size);
else
cur_op->bitmap = &cur_op->bmp_data;
@@ -166,15 +166,20 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
{
// Indicate that RDMA is enabled
wire_config["rdma_enabled"] = true;
if (req_json["connect_rdma"].is_string())
if (req_json["rdma_queues"].array_items().size())
{
// Peer is trying to connect using RDMA, try to satisfy him
bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value(), req_json["rdma_max_msg"].uint64_value());
auto cl = msgr.clients.at(cur_op->peer_fd);
bool ok = msgr.connect_rdma_client(cl, req_json["rdma_queues"], req_json["rdma_max_sge"].uint64_value());
if (ok)
{
auto rc = msgr.clients.at(cur_op->peer_fd)->rdma_conn;
wire_config["rdma_address"] = rc->addr.to_string();
wire_config["rdma_max_msg"] = rc->max_msg;
json11::Json::array rdma_queues;
for (auto rdma_conn: cl->rdma_queues)
{
rdma_queues.push_back(rdma_conn->addr.to_string());
}
wire_config["rdma_queues"] = rdma_queues;
wire_config["rdma_max_sge"] = msgr.get_rdma_max_sge();
}
}
}

View File

@@ -40,7 +40,6 @@ typedef struct VitastorClient
{
void *proxy;
void *watch;
char *config_path;
char *etcd_host;
char *etcd_prefix;
char *image;
@@ -48,10 +47,6 @@ typedef struct VitastorClient
uint64_t pool;
uint64_t size;
long readonly;
char *rdma_device;
int rdma_port_num;
int rdma_gid_index;
int rdma_mtu;
QemuMutex mutex;
} VitastorClient;
@@ -100,8 +95,7 @@ static void qemu_rbd_unescape(char *src)
}
// vitastor[:key=value]*
// vitastor[:etcd_host=127.0.0.1]:inode=1:pool=1[:rdma_gid_index=3]
// vitastor:config_path=/etc/vitastor/vitastor.conf:image=testimg
// vitastor:etcd_host=127.0.0.1:inode=1:pool=1
static void vitastor_parse_filename(const char *filename, QDict *options, Error **errp)
{
const char *start;
@@ -129,12 +123,7 @@ static void vitastor_parse_filename(const char *filename, QDict *options, Error
qemu_rbd_unescape(name);
value = qemu_rbd_next_tok(p, ':', &p);
qemu_rbd_unescape(value);
if (!strcmp(name, "inode") ||
!strcmp(name, "pool") ||
!strcmp(name, "size") ||
!strcmp(name, "rdma_port_num") ||
!strcmp(name, "rdma_gid_index") ||
!strcmp(name, "rdma_mtu"))
if (!strcmp(name, "inode") || !strcmp(name, "pool") || !strcmp(name, "size"))
{
unsigned long long num_val;
if (parse_uint_full(value, &num_val, 0))
@@ -168,6 +157,11 @@ static void vitastor_parse_filename(const char *filename, QDict *options, Error
goto out;
}
}
if (!qdict_get_str(options, "etcd_host"))
{
error_setg(errp, "etcd_host is missing");
goto out;
}
out:
g_free(buf);
@@ -195,17 +189,9 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
VitastorClient *client = bs->opaque;
int64_t ret = 0;
qemu_mutex_init(&client->mutex);
client->config_path = g_strdup(qdict_get_try_str(options, "config_path"));
client->etcd_host = g_strdup(qdict_get_try_str(options, "etcd_host"));
client->etcd_prefix = g_strdup(qdict_get_try_str(options, "etcd_prefix"));
client->rdma_device = g_strdup(qdict_get_try_str(options, "rdma_device"));
client->rdma_port_num = qdict_get_try_int(options, "rdma_port_num", 0);
client->rdma_gid_index = qdict_get_try_int(options, "rdma_gid_index", 0);
client->rdma_mtu = qdict_get_try_int(options, "rdma_mtu", 0);
client->proxy = vitastor_proxy_create(
bdrv_get_aio_context(bs), client->config_path, client->etcd_host, client->etcd_prefix,
client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu
);
client->proxy = vitastor_proxy_create(bdrv_get_aio_context(bs), client->etcd_host, client->etcd_prefix);
client->image = g_strdup(qdict_get_try_str(options, "image"));
client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0;
if (client->image)
@@ -255,11 +241,6 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
}
bs->total_sectors = client->size / BDRV_SECTOR_SIZE;
//client->aio_context = bdrv_get_aio_context(bs);
qdict_del(options, "rdma_mtu");
qdict_del(options, "rdma_gid_index");
qdict_del(options, "rdma_port_num");
qdict_del(options, "rdma_device");
qdict_del(options, "config_path");
qdict_del(options, "etcd_host");
qdict_del(options, "etcd_prefix");
qdict_del(options, "image");
@@ -274,10 +255,7 @@ static void vitastor_close(BlockDriverState *bs)
VitastorClient *client = bs->opaque;
vitastor_proxy_destroy(client->proxy);
qemu_mutex_destroy(&client->mutex);
if (client->config_path)
g_free(client->config_path);
if (client->etcd_host)
g_free(client->etcd_host);
g_free(client->etcd_host);
if (client->etcd_prefix)
g_free(client->etcd_prefix);
if (client->image)
@@ -500,7 +478,6 @@ static QEMUOptionParameter vitastor_create_opts[] = {
static const char *vitastor_strong_runtime_opts[] = {
"inode",
"pool",
"config_path",
"etcd_host",
"etcd_prefix",

View File

@@ -34,28 +34,15 @@ public:
cluster_client_t *cli;
AioContext *ctx;
QemuProxy(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu)
QemuProxy(AioContext *ctx, const char *etcd_host, const char *etcd_prefix)
{
this->ctx = ctx;
json11::Json::object cfg;
if (config_path)
cfg["config_path"] = std::string(config_path);
if (etcd_host)
cfg["etcd_address"] = std::string(etcd_host);
if (etcd_prefix)
cfg["etcd_prefix"] = std::string(etcd_prefix);
if (rdma_device)
cfg["rdma_device"] = std::string(rdma_device);
if (rdma_port_num)
cfg["rdma_port_num"] = rdma_port_num;
if (rdma_gid_index)
cfg["rdma_gid_index"] = rdma_gid_index;
if (rdma_mtu)
cfg["rdma_mtu"] = rdma_mtu;
json11::Json cfg_json(cfg);
json11::Json cfg = json11::Json::object {
{ "etcd_address", std::string(etcd_host) },
{ "etcd_prefix", std::string(etcd_prefix ? etcd_prefix : "/vitastor") },
};
tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> callback) { set_fd_handler(fd, wr, callback); });
cli = new cluster_client_t(NULL, tfd, cfg_json);
cli = new cluster_client_t(NULL, tfd, cfg);
}
~QemuProxy()
@@ -93,10 +80,9 @@ public:
extern "C" {
void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu)
void* vitastor_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix)
{
QemuProxy *p = new QemuProxy(ctx, config_path, etcd_host, etcd_prefix, rdma_device, rdma_port_num, rdma_gid_index, rdma_mtu);
QemuProxy *p = new QemuProxy(ctx, etcd_host, etcd_prefix);
return p;
}

View File

@@ -16,8 +16,7 @@ extern "C" {
// Our exports
typedef void VitastorIOHandler(long retval, void *opaque);
void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu);
void* vitastor_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix);
void vitastor_proxy_destroy(void *client);
void vitastor_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque);

View File

@@ -87,7 +87,7 @@ public:
"Vitastor inode removal tool\n"
"(c) Vitaliy Filippov, 2020 (VNPL-1.1)\n\n"
"USAGE:\n"
" %s [--etcd_address <etcd_address>] --pool <pool> --inode <inode> [--wait-list]\n",
" %s --etcd_address <etcd_address> --pool <pool> --inode <inode> [--wait-list]\n",
exe_name
);
exit(0);
@@ -95,6 +95,11 @@ public:
void run(json11::Json cfg)
{
if (cfg["etcd_address"].string_value() == "")
{
fprintf(stderr, "etcd_address is missing\n");
exit(1);
}
inode = cfg["inode"].uint64_value();
pool_id = cfg["pool"].uint64_value();
if (pool_id)

View File

@@ -46,7 +46,7 @@ $ETCDCTL put /vitastor/config/inode/1/1 '{"name":"debian9@0","size":'$((2048*102
$ETCDCTL put /vitastor/config/inode/1/2 '{"parent_id":1,"name":"debian9","size":'$((2048*1024*1024))'}'
qemu-system-x86_64 -enable-kvm -m 1024 \
-drive 'file=vitastor:etcd_host=127.0.0.1\:'$ETCD_PORT'/v3:image=debian9',format=raw,if=none,id=drive-virtio-disk0,cache=none \
-drive 'file=vitastor:etcd_host=127.0.0.1\:$ETCD_PORT/v3:image=debian9',format=raw,if=none,id=drive-virtio-disk0,cache=none \
-device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512 \
-vnc 0.0.0.0:0