diff --git a/CMakeLists.txt b/CMakeLists.txt index 1aa69555..3d1da711 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,4 +12,6 @@ else() project(vitastor VERSION "${VERSION_STRING}") endif() +set(VERSION "0.6.3") + add_subdirectory(src) diff --git a/README-ru.md b/README-ru.md index dc22a707..a18db93e 100644 --- a/README-ru.md +++ b/README-ru.md @@ -49,6 +49,7 @@ Vitastor на данный момент находится в статусе п - Именование инодов через хранение их метаданных в etcd - Снапшоты и copy-on-write клоны - Сглаживание производительности случайной записи в SSD+HDD конфигурациях +- Поддержка RDMA/RoCEv2 через libibverbs ## Планы развития @@ -60,7 +61,7 @@ Vitastor на данный момент находится в статусе п - Фоновая проверка целостности без контрольных сумм (сверка реплик) - Контрольные суммы - Поддержка SSD-кэширования (tiered storage) -- Поддержка RDMA и NVDIMM +- Поддержка NVDIMM - Web-интерфейс - Возможно, сжатие - Возможно, поддержка кэширования данных через системный page cache @@ -314,14 +315,15 @@ Ceph: ### NBD -NBD - на данный момент единственный способ монтировать Vitastor ядром Linux, но он -приводит к дополнительным копированиям данных, поэтому немного ухудшает производительность, -правда, в основном - линейную, а случайная затрагивается слабо. - NBD расшифровывается как "сетевое блочное устройство", но на самом деле оно также работает просто как аналог FUSE для блочных устройств, то есть, представляет собой "блочное устройство в пространстве пользователя". +NBD - на данный момент единственный способ монтировать Vitastor ядром Linux. +NBD немного снижает производительность, так как приводит к дополнительным копированиям +данных между ядром и пространством пользователя. Тем не менее, способ достаточно оптимален, +а производительность случайного доступа вообще затрагивается слабо. + Vitastor с однопоточной NBD прокси на том же стенде: - T1Q1 запись: 6000 iops (задержка 0.166ms) - T1Q1 чтение: 5518 iops (задержка 0.18ms) @@ -424,23 +426,90 @@ Vitastor с однопоточной NBD прокси на том же стен - Запустите все OSD: `systemctl start vitastor.target` - Ваш кластер должен быть готов - один из мониторов должен уже сконфигурировать PG, а OSD должны запустить их. - Вы можете проверить состояние PG прямо в etcd: `etcdctl --endpoints=... get --prefix /vitastor/pg/state`. Все PG должны быть 'active'. -- Пример команды для запуска тестов: `fio -thread -ioengine=libfio_vitastor.so -name=test -bs=4M -direct=1 -iodepth=16 -rw=write -etcd=10.115.0.10:2379/v3 -pool=1 -inode=1 -size=400G`. -- Пример команды для заливки образа ВМ в vitastor через qemu-img: - ``` - qemu-img convert -f qcow2 debian10.qcow2 -p -O raw 'vitastor:etcd_host=10.115.0.10\:2379/v3:pool=1:inode=1:size=2147483648' - ``` - Если вы используете немодифицированный QEMU, данной команде потребуется переменная окружения `LD_PRELOAD=/usr/lib/x86_64-linux-gnu/qemu/block-vitastor.so`. -- Пример команды запуска QEMU: - ``` - qemu-system-x86_64 -enable-kvm -m 1024 - -drive 'file=vitastor:etcd_host=10.115.0.10\:2379/v3:pool=1:inode=1:size=2147483648',format=raw,if=none,id=drive-virtio-disk0,cache=none - -device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512 - -vnc 0.0.0.0:0 - ``` -- Пример команды удаления образа (инода) из Vitastor: - ``` - vitastor-rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32 - ``` + +### Задать имя образу + +``` +etcdctl --endpoints= put /vitastor/config/inode// '{"name":"","size":[,"parent_id":][,"readonly":true]}' +``` + +Например: + +``` +etcdctl --endpoints=http://10.115.0.10:2379/v3 put /vitastor/config/inode/1/1 '{"name":"testimg","size":2147483648}' +``` + +Если вы зададите parent_id, то образ станет CoW-клоном, т.е. все новые запросы записи пойдут в новый инод, а запросы +чтения будут проверять сначала его, а потом родительские слои по цепочке вверх. Чтобы случайно не перезаписать данные +в родительском слое, вы можете переключить его в режим "только чтение", добавив флаг `"readonly":true` в его запись +метаданных. В таком случае родительский образ становится просто снапшотом. + +Таким образом, для создания снапшота вам нужно просто переименовать предыдущий inode (например, из testimg в testimg@0), +сделать его readonly и создать новый слой с исходным именем образа (testimg), ссылающийся на только что переименованный +в качестве родительского. + +### Запуск тестов с fio + +Пример команды для запуска тестов: + +``` +fio -thread -ioengine=libfio_vitastor.so -name=test -bs=4M -direct=1 -iodepth=16 -rw=write -etcd=10.115.0.10:2379/v3 -image=testimg +``` + +Если вы не хотите обращаться к образу по имени, вместо `-image=testimg` можно указать номер пула, номер инода и размер: +`-pool=1 -inode=1 -size=400G`. + +### Загрузить образ диска ВМ в/из Vitastor + +Используйте qemu-img и строку `vitastor:etcd_host=:image=` в качестве имени файла диска. Например: + +``` +qemu-img convert -f qcow2 debian10.qcow2 -p -O raw 'vitastor:etcd_host=10.115.0.10\:2379/v3:image=testimg' +``` + +Обратите внимание, что если вы используете немодифицированный QEMU, потребуется установить переменную окружения +`LD_PRELOAD=/usr/lib/x86_64-linux-gnu/qemu/block-vitastor.so`. + +Если вы не хотите обращаться к образу по имени, вместо `:image=` можно указать номер пула, номер инода и размер: +`:pool=:inode=:size=`. + +### Запустить ВМ + +Для запуска QEMU используйте опцию `-drive file=vitastor:etcd_host=:image=` (аналогично qemu-img) +и физический размер блока 4 KB. + +Например: + +``` +qemu-system-x86_64 -enable-kvm -m 1024 + -drive 'file=vitastor:etcd_host=10.115.0.10\:2379/v3:image=testimg',format=raw,if=none,id=drive-virtio-disk0,cache=none + -device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512 + -vnc 0.0.0.0:0 +``` + +Обращение по номерам (`:pool=:inode=:size=` вместо `:image=`) работает аналогично qemu-img. + +### Удалить образ + +Используйте утилиту vitastor-rm. Например: + +``` +vitastor-rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32 +``` + +### NBD + +Чтобы создать локальное блочное устройство, используйте NBD. Например: + +``` +vitastor-nbd map --etcd_address 10.115.0.10:2379/v3 --image testimg +``` + +Команда напечатает название устройства вида /dev/nbd0, которое потом можно будет форматировать +и использовать как обычное блочное устройство. + +Для обращения по номеру инода, аналогично другим командам, можно использовать опции +`--pool --inode --size ` вместо `--image testimg`. ## Известные проблемы diff --git a/README.md b/README.md index 38c2dda7..66f0c06d 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ breaking changes in the future. However, the following is implemented: - Inode metadata storage in etcd - Snapshots and copy-on-write image clones - Write throttling to smooth random write workloads in SSD+HDD configurations +- RDMA/RoCEv2 support via libibverbs ## Roadmap @@ -54,7 +55,7 @@ breaking changes in the future. However, the following is implemented: - Scrubbing without checksums (verification of replicas) - Checksums - Tiered storage -- RDMA and NVDIMM support +- NVDIMM support - Web GUI - Compression (possibly) - Read caching using system page cache (possibly) @@ -379,24 +380,86 @@ and calculate disk offsets almost by hand. This will be fixed in near future. For jerasure pools the configuration should look like the following: `2:{"name":"ecpool","scheme":"jerasure","pg_size":4,"parity_chunks":2,"pg_minsize":2,"pg_count":256,"failure_domain":"host"}`. - At this point, one of the monitors will configure PGs and OSDs will start them. - You can check PG states with `etcdctl --endpoints=... get --prefix /vitastor/pg/state`. All PGs should become 'active'. -- Run tests with (for example): `fio -thread -ioengine=libfio_vitastor.so -name=test -bs=4M -direct=1 -iodepth=16 -rw=write -etcd=10.115.0.10:2379/v3 -pool=1 -inode=1 -size=400G`. -- Upload VM disk image with qemu-img (for example): - ``` - qemu-img convert -f qcow2 debian10.qcow2 -p -O raw 'vitastor:etcd_host=10.115.0.10\:2379/v3:pool=1:inode=1:size=2147483648' - ``` - Note that the command requires to be run with `LD_PRELOAD=/usr/lib/x86_64-linux-gnu/qemu/block-vitastor.so qemu-img ...` - if you use unmodified QEMU. -- Run QEMU with (for example): - ``` - qemu-system-x86_64 -enable-kvm -m 1024 - -drive 'file=vitastor:etcd_host=10.115.0.10\:2379/v3:pool=1:inode=1:size=2147483648',format=raw,if=none,id=drive-virtio-disk0,cache=none - -device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512 - -vnc 0.0.0.0:0 - ``` -- Remove inode with (for example): - ``` - vitastor-rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32 - ``` + +### Name an image + +``` +etcdctl --endpoints= put /vitastor/config/inode// '{"name":"","size":[,"parent_id":][,"readonly":true]}' +``` + +For example: + +``` +etcdctl --endpoints=http://10.115.0.10:2379/v3 put /vitastor/config/inode/1/1 '{"name":"testimg","size":2147483648}' +``` + +If you specify parent_id the image becomes a CoW clone. I.e. all writes go to the new inode and reads first check it +and then upper layers. You can then make parent readonly by updating its entry with `"readonly":true` for safety and +basically treat it as a snapshot. + +So to create a snapshot you basically rename the previous upper layer (for example from testimg to testimg@0), make it readonly +and create a new top layer with the original name (testimg) and the previous one as a parent. + +### Run fio benchmarks + +fio command example: + +``` +fio -thread -ioengine=libfio_vitastor.so -name=test -bs=4M -direct=1 -iodepth=16 -rw=write -etcd=10.115.0.10:2379/v3 -image=testimg +``` + +If you don't want to access your image by name, you can specify pool number, inode number and size +(`-pool=1 -inode=1 -size=400G`) instead of the image name (`-image=testimg`). + +### Upload VM image + +Use qemu-img and `vitastor:etcd_host=:image=` disk filename. For example: + +``` +qemu-img convert -f qcow2 debian10.qcow2 -p -O raw 'vitastor:etcd_host=10.115.0.10\:2379/v3:image=testimg' +``` + +Note that the command requires to be run with `LD_PRELOAD=/usr/lib/x86_64-linux-gnu/qemu/block-vitastor.so qemu-img ...` +if you use unmodified QEMU. + +You can also specify `:pool=:inode=:size=` instead of `:image=` +if you don't want to use inode metadata. + +### Start a VM + +Run QEMU with `-drive file=vitastor:etcd_host=:image=` and use 4 KB physical block size. + +For example: + +``` +qemu-system-x86_64 -enable-kvm -m 1024 + -drive 'file=vitastor:etcd_host=10.115.0.10\:2379/v3:image=testimg',format=raw,if=none,id=drive-virtio-disk0,cache=none + -device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512 + -vnc 0.0.0.0:0 +``` + +You can also specify `:pool=:inode=:size=` instead of `:image=`, +just like in qemu-img. + +### Remove inode + +Use vitastor-rm. For example: + +``` +vitastor-rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32 +``` + +### NBD + +To create a local block device for a Vitastor image, use NBD. For example: + +``` +vitastor-nbd map --etcd_address 10.115.0.10:2379/v3 --image testimg +``` + +It will output the device name, like /dev/nbd0 which you can then format and mount as a normal block device. + +Again, you can use `--pool --inode --size ` insteaf of `--image ` if you want. ## Known Problems diff --git a/debian/changelog b/debian/changelog index 0c8dc7af..73df54df 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,8 +1,18 @@ -vitastor (0.6.2-1) unstable; urgency=medium +vitastor (0.6.3-1) unstable; urgency=medium + * RDMA support * Bugfixes - -- Vitaliy Filippov Tue, 02 Feb 2021 23:01:24 +0300 + -- Vitaliy Filippov Sat, 01 May 2021 18:46:10 +0300 + +vitastor (0.6.0-1) unstable; urgency=medium + + * Snapshots and Copy-on-Write clones + * Image metadata in etcd (name, size) + * Image I/O and space statistics in etcd + * Write throttling for smoothing random write workloads in SSD+HDD configurations + + -- Vitaliy Filippov Sun, 11 Apr 2021 00:49:18 +0300 vitastor (0.5.1-1) unstable; urgency=medium diff --git a/debian/control b/debian/control index ba944ea2..ba8dab4d 100644 --- a/debian/control +++ b/debian/control @@ -2,7 +2,7 @@ Source: vitastor Section: admin Priority: optional Maintainer: Vitaliy Filippov -Build-Depends: debhelper, liburing-dev (>= 0.6), g++ (>= 8), libstdc++6 (>= 8), linux-libc-dev, libgoogle-perftools-dev, libjerasure-dev, libgf-complete-dev +Build-Depends: debhelper, liburing-dev (>= 0.6), g++ (>= 8), libstdc++6 (>= 8), linux-libc-dev, libgoogle-perftools-dev, libjerasure-dev, libgf-complete-dev, libibverbs-dev Standards-Version: 4.5.0 Homepage: https://vitastor.io/ Rules-Requires-Root: no diff --git a/debian/vitastor.Dockerfile b/debian/vitastor.Dockerfile index 5320eef1..bea70ae4 100644 --- a/debian/vitastor.Dockerfile +++ b/debian/vitastor.Dockerfile @@ -22,7 +22,7 @@ RUN apt-get -y build-dep qemu RUN apt-get -y build-dep fio RUN apt-get --download-only source qemu RUN apt-get --download-only source fio -RUN apt-get -y install libjerasure-dev cmake +RUN apt-get update && apt-get -y install libjerasure-dev cmake libibverbs-dev ADD . /root/vitastor RUN set -e -x; \ @@ -40,10 +40,10 @@ RUN set -e -x; \ mkdir -p /root/packages/vitastor-$REL; \ rm -rf /root/packages/vitastor-$REL/*; \ cd /root/packages/vitastor-$REL; \ - cp -r /root/vitastor vitastor-0.6.2; \ - ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.6.2/qemu; \ - ln -s /root/fio-build/fio-*/ vitastor-0.6.2/fio; \ - cd vitastor-0.6.2; \ + cp -r /root/vitastor vitastor-0.6.3; \ + ln -s /root/packages/qemu-$REL/qemu-*/ vitastor-0.6.3/qemu; \ + ln -s /root/fio-build/fio-*/ vitastor-0.6.3/fio; \ + cd vitastor-0.6.3; \ FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ QEMU=$(head -n1 qemu/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ sh copy-qemu-includes.sh; \ @@ -59,8 +59,8 @@ RUN set -e -x; \ echo "dep:fio=$FIO" > debian/substvars; \ echo "dep:qemu=$QEMU" >> debian/substvars; \ cd /root/packages/vitastor-$REL; \ - tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.2.orig.tar.xz vitastor-0.6.2; \ - cd vitastor-0.6.2; \ + tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.3.orig.tar.xz vitastor-0.6.3; \ + cd vitastor-0.6.3; \ V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ DEBFULLNAME="Vitaliy Filippov " dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \ DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \ diff --git a/mon/mon.js b/mon/mon.js index eb892989..d8ea083c 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -41,6 +41,12 @@ const etcd_allow = new RegExp('^'+[ const etcd_tree = { config: { /* global: { + // WARNING: NOT ALL OF THESE ARE ACTUALLY CONFIGURABLE HERE + // THIS IS JUST A POOR'S MAN CONFIG DOCUMENTATION + // etcd connection + config_path: "/etc/vitastor/vitastor.conf", + etcd_address: "10.0.115.10:2379/v3", + etcd_prefix: "/vitastor", // mon etcd_mon_ttl: 30, // min: 10 etcd_mon_timeout: 1000, // ms. min: 0 @@ -50,7 +56,17 @@ const etcd_tree = { osd_out_time: 600, // seconds. min: 0 placement_levels: { datacenter: 1, rack: 2, host: 3, osd: 4, ... }, // client and osd + tcp_header_buffer_size: 65536, use_sync_send_recv: false, + use_rdma: true, + rdma_device: null, // for example, "rocep5s0f0" + rdma_port_num: 1, + rdma_gid_index: 0, + rdma_mtu: 4096, + rdma_max_sge: 128, + rdma_max_send: 32, + rdma_max_recv: 8, + rdma_max_msg: 1048576, log_level: 0, block_size: 131072, disk_alignment: 4096, diff --git a/rpm/build-tarball.sh b/rpm/build-tarball.sh index e336625d..4d36500a 100755 --- a/rpm/build-tarball.sh +++ b/rpm/build-tarball.sh @@ -48,4 +48,4 @@ FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Ve QEMU=`rpm -qi qemu qemu-kvm | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'` perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec perl -i -pe 's/(Requires:\s*qemu(?:-kvm)?)([^\n]+)?/$1 = '$QEMU'/' $VITASTOR/rpm/vitastor-el$EL.spec -tar --transform 's#^#vitastor-0.6.2/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.2$(rpm --eval '%dist').tar.gz * +tar --transform 's#^#vitastor-0.6.3/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.3$(rpm --eval '%dist').tar.gz * diff --git a/rpm/vitastor-el7.Dockerfile b/rpm/vitastor-el7.Dockerfile index c4369239..bd88fee4 100644 --- a/rpm/vitastor-el7.Dockerfile +++ b/rpm/vitastor-el7.Dockerfile @@ -17,6 +17,7 @@ RUN rpm --nomd5 -i fio*.src.rpm RUN rm -f /etc/yum.repos.d/CentOS-Media.repo RUN cd ~/rpmbuild/SPECS && yum-builddep -y --enablerepo='*' --disablerepo=centos-sclo-rh --disablerepo=centos-sclo-rh-source --disablerepo=centos-sclo-sclo-testing qemu-kvm.spec RUN cd ~/rpmbuild/SPECS && yum-builddep -y --enablerepo='*' --disablerepo=centos-sclo-rh --disablerepo=centos-sclo-rh-source --disablerepo=centos-sclo-sclo-testing fio.spec +RUN yum -y install rdma-core-devel ADD https://vitastor.io/rpms/liburing-el7/liburing-0.7-2.el7.src.rpm /root @@ -37,7 +38,7 @@ ADD . /root/vitastor RUN set -e; \ cd /root/vitastor/rpm; \ sh build-tarball.sh; \ - cp /root/vitastor-0.6.2.el7.tar.gz ~/rpmbuild/SOURCES; \ + cp /root/vitastor-0.6.3.el7.tar.gz ~/rpmbuild/SOURCES; \ cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \ cd ~/rpmbuild/SPECS/; \ rpmbuild -ba vitastor.spec; \ diff --git a/rpm/vitastor-el7.spec b/rpm/vitastor-el7.spec index 4c4058b5..6d1e6532 100644 --- a/rpm/vitastor-el7.spec +++ b/rpm/vitastor-el7.spec @@ -1,11 +1,11 @@ Name: vitastor -Version: 0.6.2 +Version: 0.6.3 Release: 1%{?dist} Summary: Vitastor, a fast software-defined clustered block storage License: Vitastor Network Public License 1.1 URL: https://vitastor.io/ -Source0: vitastor-0.6.2.el7.tar.gz +Source0: vitastor-0.6.3.el7.tar.gz BuildRequires: liburing-devel >= 0.6 BuildRequires: gperftools-devel @@ -14,6 +14,7 @@ BuildRequires: rh-nodejs12 BuildRequires: rh-nodejs12-npm BuildRequires: jerasure-devel BuildRequires: gf-complete-devel +BuildRequires: libibverbs-devel BuildRequires: cmake Requires: fio = 3.7-1.el7 Requires: qemu-kvm = 2.0.0-1.el7.6 @@ -61,8 +62,8 @@ cp -r mon %buildroot/usr/lib/vitastor/mon %_libdir/libfio_vitastor.so %_libdir/libfio_vitastor_blk.so %_libdir/libfio_vitastor_sec.so -%_libdir/libvitastor_blk.so -%_libdir/libvitastor_client.so +%_libdir/libvitastor_blk.so* +%_libdir/libvitastor_client.so* /usr/lib/vitastor diff --git a/rpm/vitastor-el8.Dockerfile b/rpm/vitastor-el8.Dockerfile index eacc8f31..8c1712ac 100644 --- a/rpm/vitastor-el8.Dockerfile +++ b/rpm/vitastor-el8.Dockerfile @@ -15,6 +15,7 @@ RUN rpm --nomd5 -i qemu*.src.rpm RUN rpm --nomd5 -i fio*.src.rpm RUN cd ~/rpmbuild/SPECS && dnf builddep -y --enablerepo=powertools --spec qemu-kvm.spec RUN cd ~/rpmbuild/SPECS && dnf builddep -y --enablerepo=powertools --spec fio.spec && dnf install -y cmake +RUN yum -y install libibverbs-devel ADD https://vitastor.io/rpms/liburing-el7/liburing-0.7-2.el7.src.rpm /root @@ -35,7 +36,7 @@ ADD . /root/vitastor RUN set -e; \ cd /root/vitastor/rpm; \ sh build-tarball.sh; \ - cp /root/vitastor-0.6.2.el8.tar.gz ~/rpmbuild/SOURCES; \ + cp /root/vitastor-0.6.3.el8.tar.gz ~/rpmbuild/SOURCES; \ cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \ cd ~/rpmbuild/SPECS/; \ rpmbuild -ba vitastor.spec; \ diff --git a/rpm/vitastor-el8.spec b/rpm/vitastor-el8.spec index cf89fdba..f0e5ec54 100644 --- a/rpm/vitastor-el8.spec +++ b/rpm/vitastor-el8.spec @@ -1,11 +1,11 @@ Name: vitastor -Version: 0.6.2 +Version: 0.6.3 Release: 1%{?dist} Summary: Vitastor, a fast software-defined clustered block storage License: Vitastor Network Public License 1.1 URL: https://vitastor.io/ -Source0: vitastor-0.6.2.el8.tar.gz +Source0: vitastor-0.6.3.el8.tar.gz BuildRequires: liburing-devel >= 0.6 BuildRequires: gperftools-devel @@ -13,6 +13,7 @@ BuildRequires: gcc-toolset-9-gcc-c++ BuildRequires: nodejs >= 10 BuildRequires: jerasure-devel BuildRequires: gf-complete-devel +BuildRequires: libibverbs-devel BuildRequires: cmake Requires: fio = 3.7-3.el8 Requires: qemu-kvm = 4.2.0-29.el8.6 @@ -58,8 +59,8 @@ cp -r mon %buildroot/usr/lib/vitastor %_libdir/libfio_vitastor.so %_libdir/libfio_vitastor_blk.so %_libdir/libfio_vitastor_sec.so -%_libdir/libvitastor_blk.so -%_libdir/libvitastor_client.so +%_libdir/libvitastor_blk.so* +%_libdir/libvitastor_client.so* /usr/lib/vitastor diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7778289b..54a0c4ff 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -9,7 +9,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$") set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}") endif() -add_definitions(-DVERSION="0.6.2") +add_definitions(-DVERSION="0.6.3") add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -I ${CMAKE_SOURCE_DIR}/src) if (${WITH_ASAN}) add_definitions(-fsanitize=address -fno-omit-frame-pointer) @@ -23,11 +23,16 @@ endif() find_package(PkgConfig) pkg_check_modules(LIBURING REQUIRED liburing) +pkg_check_modules(IBVERBS libibverbs) +if (IBVERBS_LIBRARIES) + add_definitions(-DWITH_RDMA) +endif (IBVERBS_LIBRARIES) include_directories( ../ /usr/include/jerasure ${LIBURING_INCLUDE_DIRS} + ${IBVERBS_INCLUDE_DIRS} ) # libvitastor_blk.so @@ -36,20 +41,25 @@ add_library(vitastor_blk SHARED blockstore_write.cpp blockstore_sync.cpp blockstore_stable.cpp blockstore_rollback.cpp blockstore_flush.cpp crc32c.c ringloop.cpp ) target_link_libraries(vitastor_blk - vitastor_common ${LIBURING_LIBRARIES} tcmalloc_minimal + # for timerfd_manager + vitastor_common ) -set_target_properties(vitastor_blk PROPERTIES VERSION ${VERSION_STRING} SOVERSION 0) +set_target_properties(vitastor_blk PROPERTIES VERSION ${VERSION} SOVERSION 0) # libvitastor_common.a +set(MSGR_RDMA "") +if (IBVERBS_LIBRARIES) + set(MSGR_RDMA "msgr_rdma.cpp") +endif (IBVERBS_LIBRARIES) add_library(vitastor_common STATIC epoll_manager.cpp etcd_state_client.cpp messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp - http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp + http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ${MSGR_RDMA} ) target_compile_options(vitastor_common PUBLIC -fPIC) - + # vitastor-osd add_executable(vitastor-osd osd_main.cpp osd.cpp osd_secondary.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp @@ -60,6 +70,7 @@ target_link_libraries(vitastor-osd vitastor_common vitastor_blk Jerasure + ${IBVERBS_LIBRARIES} ) @@ -71,8 +82,9 @@ target_link_libraries(vitastor_client vitastor_common tcmalloc_minimal ${LIBURING_LIBRARIES} + ${IBVERBS_LIBRARIES} ) -set_target_properties(vitastor_client PROPERTIES VERSION ${VERSION_STRING} SOVERSION 0) +set_target_properties(vitastor_client PROPERTIES VERSION ${VERSION} SOVERSION 0) # vitastor-nbd add_executable(vitastor-nbd @@ -116,6 +128,7 @@ add_executable(stub_uring_osd target_link_libraries(stub_uring_osd vitastor_common ${LIBURING_LIBRARIES} + ${IBVERBS_LIBRARIES} tcmalloc_minimal ) diff --git a/src/blockstore.cpp b/src/blockstore.cpp index bb4b4112..1a2cf629 100644 --- a/src/blockstore.cpp +++ b/src/blockstore.cpp @@ -43,11 +43,6 @@ int blockstore_t::read_bitmap(object_id oid, uint64_t target_version, void *bitm return impl->read_bitmap(oid, target_version, bitmap, result_version); } -std::unordered_map & blockstore_t::get_unstable_writes() -{ - return impl->unstable_writes; -} - std::map & blockstore_t::get_inode_space_stats() { return impl->inode_space_stats; diff --git a/src/blockstore.h b/src/blockstore.h index 07817bf8..d3a79fd0 100644 --- a/src/blockstore.h +++ b/src/blockstore.h @@ -183,9 +183,6 @@ public: // Simplified synchronous operation: get object bitmap & current version int read_bitmap(object_id oid, uint64_t target_version, void *bitmap, uint64_t *result_version = NULL); - // Unstable writes are added here (map of object_id -> version) - std::unordered_map & get_unstable_writes(); - // Get per-inode space usage statistics std::map & get_inode_space_stats(); diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index d0173cd0..408e30a7 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -16,6 +16,8 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config) { + config = osd_messenger_t::read_config(config); + this->ringloop = ringloop; this->tfd = tfd; this->config = config; @@ -53,6 +55,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd msgr.stop_client(op->peer_fd); delete op; }; + msgr.parse_config(this->config); msgr.init(); st_cli.tfd = tfd; @@ -108,6 +111,115 @@ cluster_op_t::~cluster_op_t() } } +void cluster_client_t::calc_wait(cluster_op_t *op) +{ + op->prev_wait = 0; + if (op->opcode == OSD_OP_WRITE) + { + for (auto prev = op->prev; prev; prev = prev->prev) + { + if (prev->opcode == OSD_OP_SYNC || + prev->opcode == OSD_OP_WRITE && !(op->flags & OP_FLUSH_BUFFER) && (prev->flags & OP_FLUSH_BUFFER)) + { + op->prev_wait++; + } + } + if (!op->prev_wait && pgs_loaded) + continue_rw(op); + } + else if (op->opcode == OSD_OP_SYNC) + { + for (auto prev = op->prev; prev; prev = prev->prev) + { + if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE) + { + op->prev_wait++; + } + } + if (!op->prev_wait && pgs_loaded) + continue_sync(op); + } + else + { + for (auto prev = op->prev; prev; prev = prev->prev) + { + if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER) + { + op->prev_wait++; + } + else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ) + { + // Flushes are always in the beginning + break; + } + } + if (!op->prev_wait && pgs_loaded) + continue_rw(op); + } +} + +void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc) +{ + if (opcode == OSD_OP_WRITE) + { + while (next) + { + auto n2 = next->next; + if (next->opcode == OSD_OP_SYNC || + next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER) || + next->opcode == OSD_OP_READ && (flags & OP_FLUSH_BUFFER)) + { + next->prev_wait += inc; + if (!next->prev_wait) + { + if (next->opcode == OSD_OP_SYNC) + continue_sync(next); + else + continue_rw(next); + } + } + next = n2; + } + } + else if (opcode == OSD_OP_SYNC) + { + while (next) + { + auto n2 = next->next; + if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE) + { + next->prev_wait += inc; + if (!next->prev_wait) + { + if (next->opcode == OSD_OP_SYNC) + continue_sync(next); + else + continue_rw(next); + } + } + next = n2; + } + } +} + +void cluster_client_t::erase_op(cluster_op_t *op) +{ + uint64_t opcode = op->opcode, flags = op->flags; + cluster_op_t *next = op->next; + if (op->prev) + op->prev->next = op->next; + if (op->next) + op->next->prev = op->prev; + if (op_queue_head == op) + op_queue_head = op->next; + if (op_queue_tail == op) + op_queue_tail = op->prev; + op->next = op->prev = NULL; + std::function(op->callback)(op); + if (!immediate_commit) + inc_wait(opcode, flags, next, -1); +} + void cluster_client_t::continue_ops(bool up_retry) { if (!pgs_loaded) @@ -118,60 +230,25 @@ void cluster_client_t::continue_ops(bool up_retry) if (continuing_ops) { // Attempt to reenter the function - continuing_ops = 2; return; } restart: continuing_ops = 1; - op_queue_pos = 0; - bool has_flushes = false, has_writes = false; - while (op_queue_pos < op_queue.size()) + for (auto op = op_queue_head; op; ) { - auto op = op_queue[op_queue_pos]; - bool rm = false, is_flush = op->flags & OP_FLUSH_BUFFER; - auto opcode = op->opcode; + cluster_op_t *next_op = op->next; if (!op->up_wait || up_retry) { op->up_wait = false; - if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE) + if (!op->prev_wait) { - if (is_flush || !has_flushes) - { - // Regular writes can't proceed before buffer flushes - rm = continue_rw(op); - } - } - else if (opcode == OSD_OP_SYNC) - { - if (!has_writes) - { - // SYNC can't proceed before previous writes - rm = continue_sync(op); - } + if (op->opcode == OSD_OP_SYNC) + continue_sync(op); + else + continue_rw(op); } } - if (opcode == OSD_OP_WRITE) - { - has_writes = has_writes || !rm; - if (is_flush) - { - has_flushes = has_writes || !rm; - } - } - else if (opcode == OSD_OP_SYNC) - { - // Postpone writes until previous SYNC completes - // ...so dirty_writes can't contain anything newer than SYNC - has_flushes = has_writes || !rm; - } - if (rm) - { - op_queue.erase(op_queue.begin()+op_queue_pos, op_queue.begin()+op_queue_pos+1); - } - else - { - op_queue_pos++; - } + op = next_op; if (continuing_ops == 2) { goto restart; @@ -213,11 +290,8 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) { throw std::runtime_error("Bad block size"); } - if (config["immediate_commit"] == "all") - { - // Cluster-wide immediate_commit mode - immediate_commit = true; - } + // Cluster-wide immediate_commit mode + immediate_commit = (config["immediate_commit"] == "all"); if (config.find("client_max_dirty_bytes") != config.end()) { client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value(); @@ -281,7 +355,7 @@ void cluster_client_t::on_change_hook(std::map & changes { // At this point, all pool operations should have been suspended // And now they have to be resliced! - for (auto op: op_queue) + for (auto op = op_queue_head; op; op = op->next) { if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) && INODE_POOL(op->cur_inode) == pool_item.first) @@ -362,9 +436,17 @@ void cluster_client_t::execute(cluster_op_t *op) { delete sync_op; }; - op_queue.push_back(sync_op); + sync_op->prev = op_queue_tail; + if (op_queue_tail) + { + op_queue_tail->next = sync_op; + op_queue_tail = sync_op; + } + else + op_queue_tail = op_queue_head = sync_op; dirty_bytes = 0; dirty_ops = 0; + calc_wait(sync_op); } dirty_bytes += op->len; dirty_ops++; @@ -374,8 +456,23 @@ void cluster_client_t::execute(cluster_op_t *op) dirty_bytes = 0; dirty_ops = 0; } - op_queue.push_back(op); - continue_ops(); + op->prev = op_queue_tail; + if (op_queue_tail) + { + op_queue_tail->next = op; + op_queue_tail = op; + } + else + op_queue_tail = op_queue_head = op; + if (!immediate_commit) + calc_wait(op); + else if (pgs_loaded) + { + if (op->opcode == OSD_OP_SYNC) + continue_sync(op); + else + continue_rw(op); + } } void cluster_client_t::copy_write(cluster_op_t *op, std::map & dirty_buffers) @@ -474,12 +571,16 @@ void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr) } delete op; }; - op_queue.insert(op_queue.begin(), op); - if (continuing_ops) + op->next = op_queue_head; + if (op_queue_head) { - continuing_ops = 2; - op_queue_pos++; + op_queue_head->prev = op; + op_queue_head = op; } + else + op_queue_tail = op_queue_head = op; + inc_wait(op->opcode, op->flags, op->next, 1); + continue_rw(op); } int cluster_client_t::continue_rw(cluster_op_t *op) @@ -496,7 +597,7 @@ resume_0: if (!op->len || op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity) { op->retval = -EINVAL; - std::function(op->callback)(op); + erase_op(op); return 1; } { @@ -504,7 +605,7 @@ resume_0: if (!pool_id) { op->retval = -EINVAL; - std::function(op->callback)(op); + erase_op(op); return 1; } if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() || @@ -520,7 +621,7 @@ resume_0: if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly) { op->retval = -EINVAL; - std::function(op->callback)(op); + erase_op(op); return 1; } if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER)) @@ -603,13 +704,13 @@ resume_3: } } op->retval = op->len; - std::function(op->callback)(op); + erase_op(op); return 1; } else if (op->retval != 0 && op->retval != -EPIPE) { // Fatal error (not -EPIPE) - std::function(op->callback)(op); + erase_op(op); return 1; } else @@ -849,17 +950,18 @@ int cluster_client_t::continue_sync(cluster_op_t *op) { // Sync is not required in the immediate_commit mode or if there are no dirty_osds op->retval = 0; - std::function(op->callback)(op); + erase_op(op); return 1; } // Check that all OSD connections are still alive - for (auto sync_osd: dirty_osds) + for (auto do_it = dirty_osds.begin(); do_it != dirty_osds.end(); ) { + osd_num_t sync_osd = *do_it; auto peer_it = msgr.osd_peer_fds.find(sync_osd); if (peer_it == msgr.osd_peer_fds.end()) - { - return 0; - } + dirty_osds.erase(do_it++); + else + do_it++; } // Post sync to affected OSDs for (auto & prev_op: dirty_buffers) @@ -924,7 +1026,7 @@ resume_1: uw_it++; } } - std::function(op->callback)(op); + erase_op(op); return 1; } @@ -1008,7 +1110,10 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) } if (op->inflight_count == 0) { - continue_ops(); + if (op->opcode == OSD_OP_SYNC) + continue_sync(op); + else + continue_rw(op); } } diff --git a/src/cluster_client.h b/src/cluster_client.h index 14b0d474..dcf3db32 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -36,7 +36,7 @@ struct cluster_op_t std::function callback; ~cluster_op_t(); protected: - int flags = 0; + uint64_t flags = 0; int state = 0; uint64_t cur_inode; // for snapshot reads void *buf = NULL; @@ -47,6 +47,8 @@ protected: std::vector parts; void *bitmap_buf = NULL, *part_bitmaps = NULL; unsigned bitmap_buf_size = 0; + cluster_op_t *prev = NULL, *next = NULL; + int prev_wait = 0; friend class cluster_client_t; }; @@ -66,7 +68,8 @@ class cluster_client_t uint64_t bs_block_size = 0; uint32_t bs_bitmap_granularity = 0, bs_bitmap_size = 0; std::map pg_counts; - bool immediate_commit = false; + // WARNING: initially true so execute() doesn't create fake sync + bool immediate_commit = true; // FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory. uint64_t client_max_dirty_bytes = 0; uint64_t client_max_dirty_ops = 0; @@ -76,7 +79,7 @@ class cluster_client_t int retry_timeout_id = 0; uint64_t op_id = 1; std::vector offline_ops; - std::vector op_queue; + cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL; std::map dirty_buffers; std::set dirty_osds; uint64_t dirty_bytes = 0, dirty_ops = 0; @@ -88,7 +91,6 @@ class cluster_client_t ring_consumer_t consumer; std::vector> on_ready_hooks; int continuing_ops = 0; - int op_queue_pos = 0; public: etcd_state_client_t st_cli; @@ -117,4 +119,7 @@ protected: void send_sync(cluster_op_t *op, cluster_op_part_t *part); void handle_op_part(cluster_op_part_t *part); void copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part); + void erase_op(cluster_op_t *op); + void calc_wait(cluster_op_t *op); + void inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc); }; diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index 8d9cc5d4..15b7c833 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -50,6 +50,11 @@ void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function< void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function callback) { + if (!etcd_addresses.size()) + { + fprintf(stderr, "etcd_address is missing in Vitastor configuration\n"); + exit(1); + } std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()]; std::string etcd_api_path; int pos = etcd_address.find('/'); @@ -85,7 +90,7 @@ void etcd_state_client_t::add_etcd_url(std::string addr) } } -void etcd_state_client_t::parse_config(json11::Json & config) +void etcd_state_client_t::parse_config(const json11::Json & config) { this->etcd_addresses.clear(); if (config["etcd_address"].is_string()) @@ -122,6 +127,11 @@ void etcd_state_client_t::parse_config(json11::Json & config) void etcd_state_client_t::start_etcd_watcher() { + if (!etcd_addresses.size()) + { + fprintf(stderr, "etcd_address is missing in Vitastor configuration\n"); + exit(1); + } std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()]; std::string etcd_api_path; int pos = etcd_address.find('/'); @@ -342,7 +352,7 @@ void etcd_state_client_t::load_pgs() }); } #else -void etcd_state_client_t::parse_config(json11::Json & config) +void etcd_state_client_t::parse_config(const json11::Json & config) { } diff --git a/src/etcd_state_client.h b/src/etcd_state_client.h index 7e0a9570..c74250c3 100644 --- a/src/etcd_state_client.h +++ b/src/etcd_state_client.h @@ -106,7 +106,7 @@ public: void load_global_config(); void load_pgs(); void parse_state(const etcd_kv_t & kv); - void parse_config(json11::Json & config); + void parse_config(const json11::Json & config); inode_watch_t* watch_inode(std::string name); void close_watch(inode_watch_t* watch); ~etcd_state_client_t(); diff --git a/src/fio_cluster.cpp b/src/fio_cluster.cpp index e7650f53..0797be40 100644 --- a/src/fio_cluster.cpp +++ b/src/fio_cluster.cpp @@ -24,7 +24,6 @@ #include #include -#include #include "epoll_manager.h" #include "cluster_client.h" @@ -46,6 +45,7 @@ struct sec_data struct sec_options { int __pad; + char *config_path = NULL; char *etcd_host = NULL; char *etcd_prefix = NULL; char *image = NULL; @@ -53,9 +53,23 @@ struct sec_options uint64_t inode = 0; int cluster_log = 0; int trace = 0; + int use_rdma = 0; + char *rdma_device = NULL; + int rdma_port_num = 0; + int rdma_gid_index = 0; + int rdma_mtu = 0; }; static struct fio_option options[] = { + { + .name = "conf", + .lname = "Vitastor config path", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct sec_options, config_path), + .help = "Vitastor config path", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, { .name = "etcd", .lname = "etcd address", @@ -121,6 +135,55 @@ static struct fio_option options[] = { .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_FILENAME, }, + { + .name = "use_rdma", + .lname = "Use RDMA", + .type = FIO_OPT_BOOL, + .off1 = offsetof(struct sec_options, use_rdma), + .help = "Use RDMA", + .def = "-1", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = "rdma_device", + .lname = "RDMA device name", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct sec_options, rdma_device), + .help = "RDMA device name", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = "rdma_port_num", + .lname = "RDMA port number", + .type = FIO_OPT_INT, + .off1 = offsetof(struct sec_options, rdma_port_num), + .help = "RDMA port number", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = "rdma_gid_index", + .lname = "RDMA gid index", + .type = FIO_OPT_INT, + .off1 = offsetof(struct sec_options, rdma_gid_index), + .help = "RDMA gid index", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = "rdma_mtu", + .lname = "RDMA path MTU", + .type = FIO_OPT_INT, + .off1 = offsetof(struct sec_options, rdma_mtu), + .help = "RDMA path MTU", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, { .name = NULL, }, @@ -131,12 +194,6 @@ static int sec_setup(struct thread_data *td) sec_options *o = (sec_options*)td->eo; sec_data *bsd; - if (!o->etcd_host) - { - td_verror(td, EINVAL, "etcd address is missing"); - return 1; - } - bsd = new sec_data; if (!bsd) { @@ -152,11 +209,26 @@ static int sec_setup(struct thread_data *td) td->o.open_files++; } - json11::Json cfg = json11::Json::object { - { "etcd_address", std::string(o->etcd_host) }, - { "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/vitastor") }, - { "log_level", o->cluster_log }, - }; + json11::Json::object cfg; + if (o->config_path) + cfg["config_path"] = std::string(o->config_path); + if (o->etcd_host) + cfg["etcd_address"] = std::string(o->etcd_host); + if (o->etcd_prefix) + cfg["etcd_prefix"] = std::string(o->etcd_prefix); + if (o->rdma_device) + cfg["rdma_device"] = std::string(o->rdma_device); + if (o->rdma_port_num) + cfg["rdma_port_num"] = o->rdma_port_num; + if (o->rdma_gid_index) + cfg["rdma_gid_index"] = o->rdma_gid_index; + if (o->rdma_mtu) + cfg["rdma_mtu"] = o->rdma_mtu; + if (o->cluster_log) + cfg["log_level"] = o->cluster_log; + if (o->use_rdma != -1) + cfg["use_rdma"] = o->use_rdma; + json11::Json cfg_json(cfg); if (!o->image) { @@ -181,7 +253,7 @@ static int sec_setup(struct thread_data *td) } bsd->ringloop = new ring_loop_t(512); bsd->epmgr = new epoll_manager_t(bsd->ringloop); - bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg); + bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg_json); if (o->image) { while (!bsd->cli->is_ready()) diff --git a/src/messenger.cpp b/src/messenger.cpp index 572bcf98..59dc0c21 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -12,6 +12,31 @@ void osd_messenger_t::init() { +#ifdef WITH_RDMA + if (use_rdma) + { + rdma_context = msgr_rdma_context_t::create( + rdma_device != "" ? rdma_device.c_str() : NULL, + rdma_port_num, rdma_gid_index, rdma_mtu + ); + if (!rdma_context) + { + printf("[OSD %lu] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num); + } + else + { + rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge + ? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge; + printf("[OSD %lu] RDMA initialized successfully\n", osd_num); + fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK); + tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events) + { + handle_rdma_events(); + }); + handle_rdma_events(); + } + } +#endif keepalive_timer_id = tfd->set_timer(1000, true, [this](int) { std::vector to_stop; @@ -19,7 +44,7 @@ void osd_messenger_t::init() for (auto cl_it = clients.begin(); cl_it != clients.end(); cl_it++) { auto cl = cl_it->second; - if (!cl->osd_num || cl->peer_state != PEER_CONNECTED) + if (!cl->osd_num || cl->peer_state != PEER_CONNECTED && cl->peer_state != PEER_RDMA) { // Do not run keepalive on regular clients continue; @@ -94,32 +119,58 @@ osd_messenger_t::~osd_messenger_t() { stop_client(clients.begin()->first, true); } +#ifdef WITH_RDMA + if (rdma_context) + { + delete rdma_context; + } +#endif } void osd_messenger_t::parse_config(const json11::Json & config) { +#ifdef WITH_RDMA + if (!config["use_rdma"].is_null()) + { + // RDMA is on by default in RDMA-enabled builds + this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0; + } + this->rdma_device = config["rdma_device"].string_value(); + this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value(); + if (!this->rdma_port_num) + this->rdma_port_num = 1; + this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value(); + this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value(); + this->rdma_max_sge = config["rdma_max_sge"].uint64_value(); + if (!this->rdma_max_sge) + this->rdma_max_sge = 128; + this->rdma_max_send = config["rdma_max_send"].uint64_value(); + if (!this->rdma_max_send) + this->rdma_max_send = 32; + this->rdma_max_recv = config["rdma_max_recv"].uint64_value(); + if (!this->rdma_max_recv) + this->rdma_max_recv = 8; + this->rdma_max_msg = config["rdma_max_msg"].uint64_value(); + if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024) + this->rdma_max_msg = 1024*1024; +#endif + this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value(); + if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024) + this->receive_buffer_size = 65536; this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() || config["use_sync_send_recv"].uint64_value(); this->peer_connect_interval = config["peer_connect_interval"].uint64_value(); if (!this->peer_connect_interval) - { - this->peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; - } + this->peer_connect_interval = 5; this->peer_connect_timeout = config["peer_connect_timeout"].uint64_value(); if (!this->peer_connect_timeout) - { - this->peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; - } + this->peer_connect_timeout = 5; this->osd_idle_timeout = config["osd_idle_timeout"].uint64_value(); if (!this->osd_idle_timeout) - { - this->osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT; - } + this->osd_idle_timeout = 5; this->osd_ping_timeout = config["osd_ping_timeout"].uint64_value(); if (!this->osd_ping_timeout) - { - this->osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT; - } + this->osd_ping_timeout = 5; this->log_level = config["log_level"].uint64_value(); } @@ -326,6 +377,24 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) }, }, }; +#ifdef WITH_RDMA + if (rdma_context) + { + cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg); + if (cl->rdma_conn) + { + json11::Json payload = json11::Json::object { + { "connect_rdma", cl->rdma_conn->addr.to_string() }, + { "rdma_max_msg", cl->rdma_conn->max_msg }, + }; + std::string payload_str = payload.dump(); + op->req.show_conf.json_len = payload_str.size(); + op->buf = malloc_or_die(payload_str.size()); + op->iov.push_back(op->buf, payload_str.size()); + memcpy(op->buf, payload_str.c_str(), payload_str.size()); + } + } +#endif op->callback = [this, cl](osd_op_t *op) { std::string json_err; @@ -361,12 +430,50 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) } if (err) { - osd_num_t osd_num = cl->osd_num; + osd_num_t peer_osd = cl->osd_num; stop_client(op->peer_fd); - on_connect_peer(osd_num, -1); + on_connect_peer(peer_osd, -1); delete op; return; } +#ifdef WITH_RDMA + if (config["rdma_address"].is_string()) + { + msgr_rdma_address_t addr; + if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) || + cl->rdma_conn->connect(&addr) != 0) + { + printf( + "Failed to connect to OSD %lu (address %s) using RDMA\n", + cl->osd_num, config["rdma_address"].string_value().c_str() + ); + delete cl->rdma_conn; + cl->rdma_conn = NULL; + // FIXME: Keep TCP connection in this case + osd_num_t peer_osd = cl->osd_num; + stop_client(cl->peer_fd); + on_connect_peer(peer_osd, -1); + delete op; + return; + } + else + { + uint64_t server_max_msg = config["rdma_max_msg"].uint64_value(); + if (cl->rdma_conn->max_msg > server_max_msg) + { + cl->rdma_conn->max_msg = server_max_msg; + } + if (log_level > 0) + { + printf("Connected to OSD %lu using RDMA\n", cl->osd_num); + } + cl->peer_state = PEER_RDMA; + tfd->set_fd_handler(cl->peer_fd, false, NULL); + // Add the initial receive request + try_recv_rdma(cl); + } + } +#endif osd_peer_fds[cl->osd_num] = cl->peer_fd; on_connect_peer(cl->osd_num, cl->peer_fd); delete op; @@ -408,3 +515,57 @@ void osd_messenger_t::accept_connections(int listen_fd) throw std::runtime_error(std::string("accept: ") + strerror(errno)); } } + +bool osd_messenger_t::is_rdma_enabled() +{ + return rdma_context != NULL; +} + +json11::Json osd_messenger_t::read_config(const json11::Json & config) +{ + const char *config_path = config["config_path"].string_value() != "" + ? config["config_path"].string_value().c_str() : VITASTOR_CONFIG_PATH; + int fd = open(config_path, O_RDONLY); + if (fd < 0) + { + if (errno != ENOENT) + fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno)); + return config; + } + struct stat st; + if (fstat(fd, &st) != 0) + { + fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno)); + close(fd); + return config; + } + std::string buf; + buf.resize(st.st_size); + int done = 0; + while (done < st.st_size) + { + int r = read(fd, (void*)buf.data()+done, st.st_size-done); + if (r < 0) + { + fprintf(stderr, "Error reading %s: %s\n", config_path, strerror(errno)); + close(fd); + return config; + } + done += r; + } + close(fd); + std::string json_err; + json11::Json::object file_config = json11::Json::parse(buf, json_err).object_items(); + if (json_err != "") + { + fprintf(stderr, "Invalid JSON in %s: %s\n", config_path, json_err.c_str()); + return config; + } + file_config.erase("config_path"); + file_config.erase("osd_num"); + for (auto kv: config.object_items()) + { + file_config[kv.first] = kv.second; + } + return file_config; +} diff --git a/src/messenger.h b/src/messenger.h index 34466d4f..42d58980 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -18,20 +18,32 @@ #include "timerfd_manager.h" #include +#ifdef WITH_RDMA +#include "msgr_rdma.h" +#endif + #define CL_READ_HDR 1 #define CL_READ_DATA 2 #define CL_READ_REPLY_DATA 3 #define CL_WRITE_READY 1 -#define CL_WRITE_REPLY 2 #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 -#define PEER_STOPPED 3 +#define PEER_RDMA_CONNECTING 3 +#define PEER_RDMA 4 +#define PEER_STOPPED 5 -#define DEFAULT_PEER_CONNECT_INTERVAL 5 -#define DEFAULT_PEER_CONNECT_TIMEOUT 5 -#define DEFAULT_OSD_PING_TIMEOUT 5 #define DEFAULT_BITMAP_GRANULARITY 4096 +#define VITASTOR_CONFIG_PATH "/etc/vitastor/vitastor.conf" + +#define MSGR_SENDP_HDR 1 +#define MSGR_SENDP_FREE 2 + +struct msgr_sendp_t +{ + osd_op_t *op; + int flags; +}; struct osd_client_t { @@ -48,6 +60,10 @@ struct osd_client_t void *in_buf = NULL; +#ifdef WITH_RDMA + msgr_rdma_connection_t *rdma_conn = NULL; +#endif + // Read state int read_ready = 0; osd_op_t *read_op = NULL; @@ -70,7 +86,7 @@ struct osd_client_t msghdr write_msg = { 0 }; int write_state = 0; std::vector send_list, next_send_list; - std::vector outbox, next_outbox; + std::vector outbox, next_outbox; ~osd_client_t() { @@ -104,15 +120,23 @@ struct osd_messenger_t protected: int keepalive_timer_id = -1; - // FIXME: make receive_buffer_size configurable - int receive_buffer_size = 64*1024; - int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; - int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; - int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT; - int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT; + uint32_t receive_buffer_size = 0; + int peer_connect_interval = 0; + int peer_connect_timeout = 0; + int osd_idle_timeout = 0; + int osd_ping_timeout = 0; int log_level = 0; bool use_sync_send_recv = false; +#ifdef WITH_RDMA + bool use_rdma = true; + std::string rdma_device; + uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0; + msgr_rdma_context_t *rdma_context = NULL; + uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 8; + uint64_t rdma_max_msg = 0; +#endif + std::vector read_ready_clients; std::vector write_ready_clients; std::vector> set_immediate; @@ -141,6 +165,13 @@ public: void accept_connections(int listen_fd); ~osd_messenger_t(); + static json11::Json read_config(const json11::Json & config); + +#ifdef WITH_RDMA + bool is_rdma_enabled(); + bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg); +#endif + protected: void try_connect_peer(uint64_t osd_num); void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); @@ -156,8 +187,15 @@ protected: void handle_send(int result, osd_client_t *cl); bool handle_read(int result, osd_client_t *cl); + bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain); bool handle_finished_read(osd_client_t *cl); void handle_op_hdr(osd_client_t *cl); bool handle_reply_hdr(osd_client_t *cl); void handle_reply_ready(osd_op_t *op); + +#ifdef WITH_RDMA + bool try_send_rdma(osd_client_t *cl); + bool try_recv_rdma(osd_client_t *cl); + void handle_rdma_events(); +#endif }; diff --git a/src/mock/messenger.cpp b/src/mock/messenger.cpp index 0fcd3cbd..7ba9ffa4 100644 --- a/src/mock/messenger.cpp +++ b/src/mock/messenger.cpp @@ -42,3 +42,8 @@ void osd_messenger_t::read_requests() void osd_messenger_t::send_replies() { } + +json11::Json osd_messenger_t::read_config(const json11::Json & config) +{ + return config; +} diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp new file mode 100644 index 00000000..e082bf32 --- /dev/null +++ b/src/msgr_rdma.cpp @@ -0,0 +1,521 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#include +#include +#include "msgr_rdma.h" +#include "messenger.h" + +std::string msgr_rdma_address_t::to_string() +{ + char msg[sizeof "0000:00000000:00000000:00000000000000000000000000000000"]; + sprintf( + msg, "%04x:%06x:%06x:%016lx%016lx", lid, qpn, psn, + htobe64(((uint64_t*)&gid)[0]), htobe64(((uint64_t*)&gid)[1]) + ); + return std::string(msg); +} + +bool msgr_rdma_address_t::from_string(const char *str, msgr_rdma_address_t *dest) +{ + uint64_t* gid = (uint64_t*)&dest->gid; + int n = sscanf( + str, "%hx:%x:%x:%16lx%16lx", &dest->lid, &dest->qpn, &dest->psn, gid, gid+1 + ); + gid[0] = be64toh(gid[0]); + gid[1] = be64toh(gid[1]); + return n == 5; +} + +msgr_rdma_context_t::~msgr_rdma_context_t() +{ + if (cq) + ibv_destroy_cq(cq); + if (channel) + ibv_destroy_comp_channel(channel); + if (mr) + ibv_dereg_mr(mr); + if (pd) + ibv_dealloc_pd(pd); + if (context) + ibv_close_device(context); +} + +msgr_rdma_connection_t::~msgr_rdma_connection_t() +{ + ctx->used_max_cqe -= max_send+max_recv; + if (qp) + ibv_destroy_qp(qp); +} + +msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu) +{ + int res; + ibv_device **dev_list = NULL; + msgr_rdma_context_t *ctx = new msgr_rdma_context_t(); + ctx->mtu = mtu; + + dev_list = ibv_get_device_list(NULL); + if (!dev_list) + { + fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno)); + goto cleanup; + } + if (!ib_devname) + { + ctx->dev = *dev_list; + if (!ctx->dev) + { + fprintf(stderr, "No RDMA devices found\n"); + goto cleanup; + } + } + else + { + int i; + for (i = 0; dev_list[i]; ++i) + if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname)) + break; + ctx->dev = dev_list[i]; + if (!ctx->dev) + { + fprintf(stderr, "RDMA device %s not found\n", ib_devname); + goto cleanup; + } + } + + ctx->context = ibv_open_device(ctx->dev); + if (!ctx->context) + { + fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(ctx->dev)); + goto cleanup; + } + + ctx->ib_port = ib_port; + ctx->gid_index = gid_index; + if ((res = ibv_query_port(ctx->context, ib_port, &ctx->portinfo)) != 0) + { + fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(ctx->dev), ib_port, strerror(res)); + goto cleanup; + } + ctx->my_lid = ctx->portinfo.lid; + if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid) + { + fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev)); + goto cleanup; + } + if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid)) + { + fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index); + goto cleanup; + } + + ctx->pd = ibv_alloc_pd(ctx->context); + if (!ctx->pd) + { + fprintf(stderr, "Couldn't allocate RDMA protection domain\n"); + goto cleanup; + } + + { + if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx)) + { + fprintf(stderr, "Couldn't query RDMA device for its features\n"); + goto cleanup; + } + if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) || + !(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) || + !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || + !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)) + { + fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n"); + goto cleanup; + } + } + + ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND); + if (!ctx->mr) + { + fprintf(stderr, "Couldn't register RDMA memory region\n"); + goto cleanup; + } + + ctx->channel = ibv_create_comp_channel(ctx->context); + if (!ctx->channel) + { + fprintf(stderr, "Couldn't create RDMA completion channel\n"); + goto cleanup; + } + + ctx->max_cqe = 4096; + ctx->cq = ibv_create_cq(ctx->context, ctx->max_cqe, NULL, ctx->channel, 0); + if (!ctx->cq) + { + fprintf(stderr, "Couldn't create RDMA completion queue\n"); + goto cleanup; + } + + if (dev_list) + ibv_free_device_list(dev_list); + return ctx; + +cleanup: + delete ctx; + if (dev_list) + ibv_free_device_list(dev_list); + return NULL; +} + +msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send, + uint32_t max_recv, uint32_t max_sge, uint32_t max_msg) +{ + msgr_rdma_connection_t *conn = new msgr_rdma_connection_t; + + max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge; + + conn->ctx = ctx; + conn->max_send = max_send; + conn->max_recv = max_recv; + conn->max_sge = max_sge; + conn->max_msg = max_msg; + + ctx->used_max_cqe += max_send+max_recv; + if (ctx->used_max_cqe > ctx->max_cqe) + { + // Resize CQ + // Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ + int new_max_cqe = ctx->max_cqe; + while (ctx->used_max_cqe > new_max_cqe) + { + new_max_cqe *= 2; + } + if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0) + { + fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe); + delete conn; + return NULL; + } + ctx->max_cqe = new_max_cqe; + } + + ibv_qp_init_attr init_attr = { + .send_cq = ctx->cq, + .recv_cq = ctx->cq, + .cap = { + .max_send_wr = max_send, + .max_recv_wr = max_recv, + .max_send_sge = max_sge, + .max_recv_sge = max_sge, + }, + .qp_type = IBV_QPT_RC, + }; + conn->qp = ibv_create_qp(ctx->pd, &init_attr); + if (!conn->qp) + { + fprintf(stderr, "Couldn't create RDMA queue pair\n"); + delete conn; + return NULL; + } + + conn->addr.lid = ctx->my_lid; + conn->addr.gid = ctx->my_gid; + conn->addr.qpn = conn->qp->qp_num; + conn->addr.psn = lrand48() & 0xffffff; + + ibv_qp_attr attr = { + .qp_state = IBV_QPS_INIT, + .qp_access_flags = 0, + .pkey_index = 0, + .port_num = ctx->ib_port, + }; + + if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)) + { + fprintf(stderr, "Failed to switch RDMA queue pair to INIT state\n"); + delete conn; + return NULL; + } + + return conn; +} + +static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu) +{ + switch (mtu) + { + case 256: return IBV_MTU_256; + case 512: return IBV_MTU_512; + case 1024: return IBV_MTU_1024; + case 2048: return IBV_MTU_2048; + case 4096: return IBV_MTU_4096; + } + return IBV_MTU_4096; +} + +int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) +{ + auto conn = this; + ibv_qp_attr attr = { + .qp_state = IBV_QPS_RTR, + .path_mtu = mtu_to_ibv_mtu(conn->ctx->mtu), + .rq_psn = dest->psn, + .sq_psn = conn->addr.psn, + .dest_qp_num = dest->qpn, + .ah_attr = { + .grh = { + .dgid = dest->gid, + .sgid_index = conn->ctx->gid_index, + .hop_limit = 1, // FIXME can it vary? + }, + .dlid = dest->lid, + .sl = 0, // service level + .src_path_bits = 0, + .is_global = (uint8_t)(dest->gid.global.interface_id ? 1 : 0), + .port_num = conn->ctx->ib_port, + }, + .max_rd_atomic = 1, + .max_dest_rd_atomic = 1, + // Timeout and min_rnr_timer actual values seem to be 4.096us*2^(timeout+1) + .min_rnr_timer = 1, + .timeout = 14, + .retry_cnt = 7, + .rnr_retry = 7, + }; + // FIXME No idea if ibv_modify_qp is a blocking operation or not. No idea if it has a timeout and what it is. + if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER)) + { + fprintf(stderr, "Failed to switch RDMA queue pair to RTR (ready-to-receive) state\n"); + return 1; + } + attr.qp_state = IBV_QPS_RTS; + if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC)) + { + fprintf(stderr, "Failed to switch RDMA queue pair to RTS (ready-to-send) state\n"); + return 1; + } + return 0; +} + +bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg) +{ + // Try to connect to the peer using RDMA + msgr_rdma_address_t addr; + if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr)) + { + if (client_max_msg > rdma_max_msg) + { + client_max_msg = rdma_max_msg; + } + auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg); + if (rdma_conn) + { + int r = rdma_conn->connect(&addr); + if (r != 0) + { + delete rdma_conn; + printf( + "Failed to connect RDMA queue pair to %s (client %d)\n", + addr.to_string().c_str(), peer_fd + ); + } + else + { + // Remember connection, but switch to RDMA only after sending the configuration response + auto cl = clients.at(peer_fd); + cl->rdma_conn = rdma_conn; + cl->peer_state = PEER_RDMA_CONNECTING; + return true; + } + } + } + return false; +} + +static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) +{ + ibv_send_wr *bad_wr = NULL; + ibv_send_wr wr = { + .wr_id = (uint64_t)(cl->peer_fd*2+1), + .sg_list = sge, + .num_sge = op_sge, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, + }; + int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); + if (err || bad_wr) + { + printf("RDMA send failed: %s\n", strerror(err)); + exit(1); + } + cl->rdma_conn->cur_send++; +} + +bool osd_messenger_t::try_send_rdma(osd_client_t *cl) +{ + auto rc = cl->rdma_conn; + if (!cl->send_list.size() || rc->cur_send > 0) + { + // Only send one batch at a time + return true; + } + uint64_t op_size = 0, op_sge = 0; + ibv_sge sge[rc->max_sge]; + while (rc->send_pos < cl->send_list.size()) + { + iovec & iov = cl->send_list[rc->send_pos]; + if (op_size >= rc->max_msg || op_sge >= rc->max_sge) + { + try_send_rdma_wr(cl, sge, op_sge); + op_sge = 0; + op_size = 0; + if (rc->cur_send >= rc->max_send) + { + break; + } + } + uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg + ? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size); + sge[op_sge++] = { + .addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos), + .length = len, + .lkey = rc->ctx->mr->lkey, + }; + op_size += len; + rc->send_buf_pos += len; + if (rc->send_buf_pos >= iov.iov_len) + { + rc->send_pos++; + rc->send_buf_pos = 0; + } + } + if (op_sge > 0) + { + try_send_rdma_wr(cl, sge, op_sge); + } + return true; +} + +static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) +{ + ibv_recv_wr *bad_wr = NULL; + ibv_recv_wr wr = { + .wr_id = (uint64_t)(cl->peer_fd*2), + .sg_list = sge, + .num_sge = op_sge, + }; + int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr); + if (err || bad_wr) + { + printf("RDMA receive failed: %s\n", strerror(err)); + exit(1); + } + cl->rdma_conn->cur_recv++; +} + +bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) +{ + auto rc = cl->rdma_conn; + while (rc->cur_recv < rc->max_recv) + { + void *buf = malloc_or_die(rc->max_msg); + rc->recv_buffers.push_back(buf); + ibv_sge sge = { + .addr = (uintptr_t)buf, + .length = (uint32_t)rc->max_msg, + .lkey = rc->ctx->mr->lkey, + }; + try_recv_rdma_wr(cl, &sge, 1); + } + return true; +} + +#define RDMA_EVENTS_AT_ONCE 32 + +void osd_messenger_t::handle_rdma_events() +{ + // Request next notification + ibv_cq *ev_cq; + void *ev_ctx; + // FIXME: This is inefficient as it calls read()... + if (ibv_get_cq_event(rdma_context->channel, &ev_cq, &ev_ctx) == 0) + { + ibv_ack_cq_events(rdma_context->cq, 1); + } + if (ibv_req_notify_cq(rdma_context->cq, 0) != 0) + { + printf("Failed to request RDMA completion notification, exiting\n"); + exit(1); + } + ibv_wc wc[RDMA_EVENTS_AT_ONCE]; + int event_count; + do + { + event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc); + for (int i = 0; i < event_count; i++) + { + int client_id = wc[i].wr_id >> 1; + bool is_send = wc[i].wr_id & 1; + auto cl_it = clients.find(client_id); + if (cl_it == clients.end()) + { + continue; + } + osd_client_t *cl = cl_it->second; + if (wc[i].status != IBV_WC_SUCCESS) + { + printf("RDMA work request failed for client %d", client_id); + if (cl->osd_num) + { + printf(" (OSD %lu)", cl->osd_num); + } + printf(" with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status)); + stop_client(client_id); + continue; + } + if (!is_send) + { + cl->rdma_conn->cur_recv--; + handle_read_buffer(cl, cl->rdma_conn->recv_buffers[0], wc[i].byte_len); + free(cl->rdma_conn->recv_buffers[0]); + cl->rdma_conn->recv_buffers.erase(cl->rdma_conn->recv_buffers.begin(), cl->rdma_conn->recv_buffers.begin()+1); + try_recv_rdma(cl); + } + else + { + cl->rdma_conn->cur_send--; + if (!cl->rdma_conn->cur_send) + { + // Wait for the whole batch + for (int i = 0; i < cl->rdma_conn->send_pos; i++) + { + if (cl->outbox[i].flags & MSGR_SENDP_FREE) + { + // Reply fully sent + delete cl->outbox[i].op; + } + } + if (cl->rdma_conn->send_pos > 0) + { + cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos); + cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos); + cl->rdma_conn->send_pos = 0; + } + if (cl->rdma_conn->send_buf_pos > 0) + { + cl->send_list[0].iov_base += cl->rdma_conn->send_buf_pos; + cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos; + cl->rdma_conn->send_buf_pos = 0; + } + try_send_rdma(cl); + } + } + } + } while (event_count > 0); + for (auto cb: set_immediate) + { + cb(); + } + set_immediate.clear(); +} diff --git a/src/msgr_rdma.h b/src/msgr_rdma.h new file mode 100644 index 00000000..4f257707 --- /dev/null +++ b/src/msgr_rdma.h @@ -0,0 +1,58 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#pragma once +#include +#include +#include + +struct msgr_rdma_address_t +{ + ibv_gid gid; + uint16_t lid; + uint32_t qpn; + uint32_t psn; + + std::string to_string(); + static bool from_string(const char *str, msgr_rdma_address_t *dest); +}; + +struct msgr_rdma_context_t +{ + ibv_context *context = NULL; + ibv_device *dev = NULL; + ibv_device_attr_ex attrx; + ibv_pd *pd = NULL; + ibv_mr *mr = NULL; + ibv_comp_channel *channel = NULL; + ibv_cq *cq = NULL; + ibv_port_attr portinfo; + uint8_t ib_port; + uint8_t gid_index; + uint16_t my_lid; + ibv_gid my_gid; + uint32_t mtu; + int max_cqe = 0; + int used_max_cqe = 0; + + static msgr_rdma_context_t *create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu); + ~msgr_rdma_context_t(); +}; + +struct msgr_rdma_connection_t +{ + msgr_rdma_context_t *ctx = NULL; + ibv_qp *qp = NULL; + msgr_rdma_address_t addr; + int max_send = 0, max_recv = 0, max_sge = 0; + int cur_send = 0, cur_recv = 0; + uint64_t max_msg = 0; + + int send_pos = 0, send_buf_pos = 0; + int recv_pos = 0, recv_buf_pos = 0; + std::vector recv_buffers; + + ~msgr_rdma_connection_t(); + static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg); + int connect(msgr_rdma_address_t *dest); +}; diff --git a/src/msgr_receive.cpp b/src/msgr_receive.cpp index e5859d70..873527b4 100644 --- a/src/msgr_receive.cpp +++ b/src/msgr_receive.cpp @@ -91,48 +91,9 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl) { if (cl->read_iov.iov_base == cl->in_buf) { - // Compose operation(s) from the buffer - int remain = result; - void *curbuf = cl->in_buf; - while (remain > 0) + if (!handle_read_buffer(cl, cl->in_buf, result)) { - if (!cl->read_op) - { - cl->read_op = new osd_op_t; - cl->read_op->peer_fd = cl->peer_fd; - cl->read_op->op_type = OSD_OP_IN; - cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); - cl->read_remaining = OSD_PACKET_SIZE; - cl->read_state = CL_READ_HDR; - } - while (cl->recv_list.done < cl->recv_list.count && remain > 0) - { - iovec* cur = cl->recv_list.get_iovec(); - if (cur->iov_len > remain) - { - memcpy(cur->iov_base, curbuf, remain); - cl->read_remaining -= remain; - cur->iov_len -= remain; - cur->iov_base += remain; - remain = 0; - } - else - { - memcpy(cur->iov_base, curbuf, cur->iov_len); - curbuf += cur->iov_len; - cl->read_remaining -= cur->iov_len; - remain -= cur->iov_len; - cur->iov_len = 0; - cl->recv_list.done++; - } - } - if (cl->recv_list.done >= cl->recv_list.count) - { - if (!handle_finished_read(cl)) - { - goto fin; - } - } + goto fin; } } else @@ -159,6 +120,52 @@ fin: return ret; } +bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain) +{ + // Compose operation(s) from the buffer + while (remain > 0) + { + if (!cl->read_op) + { + cl->read_op = new osd_op_t; + cl->read_op->peer_fd = cl->peer_fd; + cl->read_op->op_type = OSD_OP_IN; + cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); + cl->read_remaining = OSD_PACKET_SIZE; + cl->read_state = CL_READ_HDR; + } + while (cl->recv_list.done < cl->recv_list.count && remain > 0) + { + iovec* cur = cl->recv_list.get_iovec(); + if (cur->iov_len > remain) + { + memcpy(cur->iov_base, curbuf, remain); + cl->read_remaining -= remain; + cur->iov_len -= remain; + cur->iov_base += remain; + remain = 0; + } + else + { + memcpy(cur->iov_base, curbuf, cur->iov_len); + curbuf += cur->iov_len; + cl->read_remaining -= cur->iov_len; + remain -= cur->iov_len; + cur->iov_len = 0; + cl->recv_list.done++; + } + } + if (cl->recv_list.done >= cl->recv_list.count) + { + if (!handle_finished_read(cl)) + { + return false; + } + } + } + return true; +} + bool osd_messenger_t::handle_finished_read(osd_client_t *cl) { cl->recv_list.reset(); @@ -254,6 +261,16 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl) } cl->read_remaining = cur_op->req.rw.len; } + else if (cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG) + { + if (cur_op->req.show_conf.json_len > 0) + { + cur_op->buf = malloc_or_die(cur_op->req.show_conf.json_len+1); + ((uint8_t*)cur_op->buf)[cur_op->req.show_conf.json_len] = 0; + cl->recv_list.push_back(cur_op->buf, cur_op->req.show_conf.json_len); + } + cl->read_remaining = cur_op->req.show_conf.json_len; + } if (cl->read_remaining > 0) { // Read data @@ -338,11 +355,11 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) } else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0) { - assert(!op->iov.count); delete cl->read_op; cl->read_op = op; cl->read_state = CL_READ_REPLY_DATA; cl->read_remaining = op->reply.hdr.retval; + free(op->buf); op->buf = malloc_or_die(op->reply.hdr.retval); cl->recv_list.push_back(op->buf, op->reply.hdr.retval); } diff --git a/src/msgr_send.cpp b/src/msgr_send.cpp index 8bdaf197..972ec0bd 100644 --- a/src/msgr_send.cpp +++ b/src/msgr_send.cpp @@ -46,7 +46,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE }); cl->sent_ops[cur_op->req.hdr.id] = cur_op; } - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_HDR }); // Bitmap if (cur_op->op_type == OSD_OP_IN && cur_op->req.hdr.opcode == OSD_OP_SEC_READ && @@ -56,7 +56,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) .iov_base = cur_op->bitmap, .iov_len = cur_op->reply.sec_rw.attr_len, }); - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); } else if (cur_op->op_type == OSD_OP_OUT && (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) && @@ -66,7 +66,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) .iov_base = cur_op->bitmap, .iov_len = cur_op->req.sec_rw.attr_len, }); - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); } // Operation data if ((cur_op->op_type == OSD_OP_IN @@ -78,13 +78,14 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE || cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || - cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)) && cur_op->iov.count > 0) + cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK || + cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)) && cur_op->iov.count > 0) { for (int i = 0; i < cur_op->iov.count; i++) { assert(cur_op->iov.buf[i].iov_base); to_send_list.push_back(cur_op->iov.buf[i]); - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); } } if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) @@ -93,13 +94,19 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->reply.hdr.retval }); else if (cur_op->op_type == OSD_OP_OUT && cur_op->req.sec_read_bmp.len > 0) to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len }); - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); } if (cur_op->op_type == OSD_OP_IN) { - // To free it later - to_outbox[to_outbox.size()-1] = cur_op; + to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_FREE; } +#ifdef WITH_RDMA + if (cl->peer_state == PEER_RDMA) + { + try_send_rdma(cl); + return; + } +#endif if (!ringloop) { // FIXME: It's worse because it doesn't allow batching @@ -232,10 +239,10 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) iovec & iov = cl->send_list[done]; if (iov.iov_len <= result) { - if (cl->outbox[done]) + if (cl->outbox[done].flags & MSGR_SENDP_FREE) { // Reply fully sent - delete cl->outbox[done]; + delete cl->outbox[done].op; } result -= iov.iov_len; done++; @@ -260,6 +267,21 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) cl->next_outbox.clear(); } cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0; +#ifdef WITH_RDMA + if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING) + { + // FIXME: Do something better than just forgetting the FD + // FIXME: Ignore pings during RDMA state transition + if (log_level > 0) + { + printf("Successfully connected with client %d using RDMA\n", cl->peer_fd); + } + cl->peer_state = PEER_RDMA; + tfd->set_fd_handler(cl->peer_fd, false, NULL); + // Add the initial receive request + try_recv_rdma(cl); + } +#endif } if (cl->write_state != 0) { diff --git a/src/msgr_stop.cpp b/src/msgr_stop.cpp index 5caa65ec..ae15a923 100644 --- a/src/msgr_stop.cpp +++ b/src/msgr_stop.cpp @@ -122,6 +122,12 @@ void osd_messenger_t::stop_client(int peer_fd, bool force) // And close the FD only when everything is done // ...because peer_fd number can get reused after close() close(peer_fd); +#ifdef WITH_RDMA + if (cl->rdma_conn) + { + delete cl->rdma_conn; + } +#endif #endif // Find the item again because it can be invalidated at this point it = clients.find(peer_fd); diff --git a/src/nbd_proxy.cpp b/src/nbd_proxy.cpp index 89f6fa59..2a0c5811 100644 --- a/src/nbd_proxy.cpp +++ b/src/nbd_proxy.cpp @@ -26,7 +26,10 @@ const char *exe_name = NULL; class nbd_proxy { protected: + std::string image_name; uint64_t inode = 0; + uint64_t device_size = 0; + inode_watch_t *watch = NULL; ring_loop_t *ringloop = NULL; epoll_manager_t *epmgr = NULL; @@ -111,9 +114,9 @@ public: { printf( "Vitastor NBD proxy\n" - "(c) Vitaliy Filippov, 2020 (VNPL-1.1)\n\n" + "(c) Vitaliy Filippov, 2020-2021 (VNPL-1.1)\n\n" "USAGE:\n" - " %s map --etcd_address --pool --inode --size \n" + " %s map [--etcd_address ] (--image | --pool --inode --size )\n" " %s unmap /dev/nbd0\n" " %s list [--json]\n", exe_name, exe_name, exe_name @@ -143,26 +146,49 @@ public: void start(json11::Json cfg) { // Check options - if (cfg["etcd_address"].string_value() == "") + if (cfg["image"].string_value() != "") { - fprintf(stderr, "etcd_address is missing\n"); - exit(1); + // Use image name + image_name = cfg["image"].string_value(); + inode = 0; } - if (!cfg["size"].uint64_value()) + else { - fprintf(stderr, "device size is missing\n"); - exit(1); + // Use pool, inode number and size + if (!cfg["size"].uint64_value()) + { + fprintf(stderr, "device size is missing\n"); + exit(1); + } + device_size = cfg["size"].uint64_value(); + inode = cfg["inode"].uint64_value(); + uint64_t pool = cfg["pool"].uint64_value(); + if (pool) + { + inode = (inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (pool << (64-POOL_ID_BITS)); + } + if (!(inode >> (64-POOL_ID_BITS))) + { + fprintf(stderr, "pool is missing\n"); + exit(1); + } } - inode = cfg["inode"].uint64_value(); - uint64_t pool = cfg["pool"].uint64_value(); - if (pool) + // Create client + ringloop = new ring_loop_t(512); + epmgr = new epoll_manager_t(ringloop); + cli = new cluster_client_t(ringloop, epmgr->tfd, cfg); + if (!inode) { - inode = (inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (pool << (64-POOL_ID_BITS)); - } - if (!(inode >> (64-POOL_ID_BITS))) - { - fprintf(stderr, "pool is missing\n"); - exit(1); + // Load image metadata + while (!cli->is_ready()) + { + ringloop->loop(); + if (cli->is_ready()) + break; + ringloop->wait(); + } + watch = cli->st_cli.watch_inode(image_name); + device_size = watch->cfg.size; } // Initialize NBD int sockfd[2]; @@ -176,7 +202,7 @@ public: load_module(); if (!cfg["dev_num"].is_null()) { - if (run_nbd(sockfd, cfg["dev_num"].int64_value(), cfg["size"].uint64_value(), NBD_FLAG_SEND_FLUSH, 30) < 0) + if (run_nbd(sockfd, cfg["dev_num"].int64_value(), device_size, NBD_FLAG_SEND_FLUSH, 30) < 0) { perror("run_nbd"); exit(1); @@ -188,7 +214,7 @@ public: int i = 0; while (true) { - int r = run_nbd(sockfd, i, cfg["size"].uint64_value(), NBD_FLAG_SEND_FLUSH, 30); + int r = run_nbd(sockfd, i, device_size, NBD_FLAG_SEND_FLUSH, 30); if (r == 0) { printf("/dev/nbd%d\n", i); @@ -215,10 +241,6 @@ public: { daemonize(); } - // Create client - ringloop = new ring_loop_t(512); - epmgr = new epoll_manager_t(ringloop); - cli = new cluster_client_t(ringloop, epmgr->tfd, cfg); // Initialize read state read_state = CL_READ_HDR; recv_buf = malloc_or_die(receive_buffer_size); @@ -242,6 +264,7 @@ public: ringloop->loop(); ringloop->wait(); } + // FIXME: Cleanup when exiting } void load_module() @@ -610,7 +633,7 @@ protected: if (req_type == NBD_CMD_READ || req_type == NBD_CMD_WRITE) { op->opcode = req_type == NBD_CMD_READ ? OSD_OP_READ : OSD_OP_WRITE; - op->inode = inode; + op->inode = inode ? inode : watch->cfg.num; op->offset = be64toh(cur_req.from); op->len = be32toh(cur_req.len); buf = malloc_or_die(sizeof(nbd_reply) + op->len); @@ -657,7 +680,15 @@ protected: } else { - cli->execute(cur_op); + if (cur_op->opcode == OSD_OP_WRITE && watch->cfg.readonly) + { + cur_op->retval = -EROFS; + std::function(cur_op->callback)(cur_op); + } + else + { + cli->execute(cur_op); + } cur_op = NULL; cur_buf = &cur_req; cur_left = sizeof(nbd_request); diff --git a/src/osd.cpp b/src/osd.cpp index 5ddb508f..7998c88c 100644 --- a/src/osd.cpp +++ b/src/osd.cpp @@ -10,31 +10,39 @@ #include "osd.h" #include "http_client.h" -osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop) +static blockstore_config_t json_to_bs(const json11::Json::object & config) { - bs_block_size = strtoull(config["block_size"].c_str(), NULL, 10); - bs_bitmap_granularity = strtoull(config["bitmap_granularity"].c_str(), NULL, 10); - if (!bs_block_size) - bs_block_size = DEFAULT_BLOCK_SIZE; - if (!bs_bitmap_granularity) - bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY; - clean_entry_bitmap_size = bs_block_size / bs_bitmap_granularity / 8; + blockstore_config_t bs; + for (auto kv: config) + { + if (kv.second.is_string()) + bs[kv.first] = kv.second.string_value(); + else + bs[kv.first] = kv.second.dump(); + } + return bs; +} +osd_t::osd_t(const json11::Json & config, ring_loop_t *ringloop) +{ zero_buffer_size = 1<<20; zero_buffer = malloc_or_die(zero_buffer_size); memset(zero_buffer, 0, zero_buffer_size); - this->config = config; this->ringloop = ringloop; + this->config = msgr.read_config(config).object_items(); + if (this->config.find("log_level") == this->config.end()) + this->config["log_level"] = 1; + parse_config(this->config); + epmgr = new epoll_manager_t(ringloop); // FIXME: Use timerfd_interval based directly on io_uring this->tfd = epmgr->tfd; // FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config - this->bs = new blockstore_t(config, ringloop, tfd); - - parse_config(config); + auto bs_cfg = json_to_bs(this->config); + this->bs = new blockstore_t(bs_cfg, ringloop, tfd); this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id) { @@ -45,11 +53,11 @@ osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop) print_slow(); }); - c_cli.tfd = this->tfd; - c_cli.ringloop = this->ringloop; - c_cli.exec_op = [this](osd_op_t *op) { exec_op(op); }; - c_cli.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); }; - c_cli.init(); + msgr.tfd = this->tfd; + msgr.ringloop = this->ringloop; + msgr.exec_op = [this](osd_op_t *op) { exec_op(op); }; + msgr.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); }; + msgr.init(); init_cluster(); @@ -66,62 +74,71 @@ osd_t::~osd_t() free(zero_buffer); } -void osd_t::parse_config(blockstore_config_t & config) +void osd_t::parse_config(const json11::Json & config) { - if (config.find("log_level") == config.end()) - config["log_level"] = "1"; - log_level = strtoull(config["log_level"].c_str(), NULL, 10); - // Initial startup configuration - json11::Json json_config = json11::Json(config); - st_cli.parse_config(json_config); - etcd_report_interval = strtoull(config["etcd_report_interval"].c_str(), NULL, 10); - if (etcd_report_interval <= 0) - etcd_report_interval = 30; - osd_num = strtoull(config["osd_num"].c_str(), NULL, 10); + st_cli.parse_config(config); + msgr.parse_config(config); + // OSD number + osd_num = config["osd_num"].uint64_value(); if (!osd_num) throw std::runtime_error("osd_num is required in the configuration"); - c_cli.osd_num = osd_num; + msgr.osd_num = osd_num; + // Vital Blockstore parameters + bs_block_size = config["block_size"].uint64_value(); + if (!bs_block_size) + bs_block_size = DEFAULT_BLOCK_SIZE; + bs_bitmap_granularity = config["bitmap_granularity"].uint64_value(); + if (!bs_bitmap_granularity) + bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY; + clean_entry_bitmap_size = bs_block_size / bs_bitmap_granularity / 8; + // Bind address + bind_address = config["bind_address"].string_value(); + if (bind_address == "") + bind_address = "0.0.0.0"; + bind_port = config["bind_port"].uint64_value(); + if (bind_port <= 0 || bind_port > 65535) + bind_port = 0; + // OSD configuration + log_level = config["log_level"].uint64_value(); + etcd_report_interval = config["etcd_report_interval"].uint64_value(); + if (etcd_report_interval <= 0) + etcd_report_interval = 30; + readonly = config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes"; run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no"; no_rebalance = config["no_rebalance"] == "true" || config["no_rebalance"] == "1" || config["no_rebalance"] == "yes"; no_recovery = config["no_recovery"] == "true" || config["no_recovery"] == "1" || config["no_recovery"] == "yes"; - // Cluster configuration - bind_address = config["bind_address"]; - if (bind_address == "") - bind_address = "0.0.0.0"; - bind_port = stoull_full(config["bind_port"]); - if (bind_port <= 0 || bind_port > 65535) - bind_port = 0; + allow_test_ops = config["allow_test_ops"] == "true" || config["allow_test_ops"] == "1" || config["allow_test_ops"] == "yes"; if (config["immediate_commit"] == "all") immediate_commit = IMMEDIATE_ALL; else if (config["immediate_commit"] == "small") immediate_commit = IMMEDIATE_SMALL; - if (config.find("autosync_interval") != config.end()) + else + immediate_commit = IMMEDIATE_NONE; + if (!config["autosync_interval"].is_null()) { - autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10); + // Allow to set it to 0 + autosync_interval = config["autosync_interval"].uint64_value(); if (autosync_interval > MAX_AUTOSYNC_INTERVAL) autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; } - if (config.find("client_queue_depth") != config.end()) + if (!config["client_queue_depth"].is_null()) { - client_queue_depth = strtoull(config["client_queue_depth"].c_str(), NULL, 10); + client_queue_depth = config["client_queue_depth"].uint64_value(); if (client_queue_depth < 128) client_queue_depth = 128; } - recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10); + recovery_queue_depth = config["recovery_queue_depth"].uint64_value(); if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE) recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; - recovery_sync_batch = strtoull(config["recovery_sync_batch"].c_str(), NULL, 10); + recovery_sync_batch = config["recovery_sync_batch"].uint64_value(); if (recovery_sync_batch < 1 || recovery_sync_batch > MAX_RECOVERY_QUEUE) recovery_sync_batch = DEFAULT_RECOVERY_BATCH; - if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes") - readonly = true; - print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10); + print_stats_interval = config["print_stats_interval"].uint64_value(); if (!print_stats_interval) print_stats_interval = 3; - slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10); + slow_log_interval = config["slow_log_interval"].uint64_value(); if (!slow_log_interval) slow_log_interval = 10; - c_cli.parse_config(json_config); } void osd_t::bind_socket() @@ -174,7 +191,7 @@ void osd_t::bind_socket() epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events) { - c_cli.accept_connections(listen_fd); + msgr.accept_connections(listen_fd); }); } @@ -191,8 +208,8 @@ bool osd_t::shutdown() void osd_t::loop() { handle_peers(); - c_cli.read_requests(); - c_cli.send_replies(); + msgr.read_requests(); + msgr.send_replies(); ringloop->submit(); } @@ -276,7 +293,7 @@ void osd_t::exec_op(osd_op_t *cur_op) void osd_t::reset_stats() { - c_cli.stats = { 0 }; + msgr.stats = { 0 }; prev_stats = { 0 }; memset(recovery_stat_count, 0, sizeof(recovery_stat_count)); memset(recovery_stat_bytes, 0, sizeof(recovery_stat_bytes)); @@ -286,11 +303,11 @@ void osd_t::print_stats() { for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++) { - if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i] && i != OSD_OP_PING) + if (msgr.stats.op_stat_count[i] != prev_stats.op_stat_count[i] && i != OSD_OP_PING) { - uint64_t avg = (c_cli.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(c_cli.stats.op_stat_count[i] - prev_stats.op_stat_count[i]); - uint64_t bw = (c_cli.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval; - if (c_cli.stats.op_stat_bytes[i] != 0) + uint64_t avg = (msgr.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(msgr.stats.op_stat_count[i] - prev_stats.op_stat_count[i]); + uint64_t bw = (msgr.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval; + if (msgr.stats.op_stat_bytes[i] != 0) { printf( "[OSD %lu] avg latency for op %d (%s): %lu us, B/W: %.2f %s\n", osd_num, i, osd_op_names[i], avg, @@ -302,19 +319,19 @@ void osd_t::print_stats() { printf("[OSD %lu] avg latency for op %d (%s): %lu us\n", osd_num, i, osd_op_names[i], avg); } - prev_stats.op_stat_count[i] = c_cli.stats.op_stat_count[i]; - prev_stats.op_stat_sum[i] = c_cli.stats.op_stat_sum[i]; - prev_stats.op_stat_bytes[i] = c_cli.stats.op_stat_bytes[i]; + prev_stats.op_stat_count[i] = msgr.stats.op_stat_count[i]; + prev_stats.op_stat_sum[i] = msgr.stats.op_stat_sum[i]; + prev_stats.op_stat_bytes[i] = msgr.stats.op_stat_bytes[i]; } } for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++) { - if (c_cli.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i]) + if (msgr.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i]) { - uint64_t avg = (c_cli.stats.subop_stat_sum[i] - prev_stats.subop_stat_sum[i])/(c_cli.stats.subop_stat_count[i] - prev_stats.subop_stat_count[i]); + uint64_t avg = (msgr.stats.subop_stat_sum[i] - prev_stats.subop_stat_sum[i])/(msgr.stats.subop_stat_count[i] - prev_stats.subop_stat_count[i]); printf("[OSD %lu] avg latency for subop %d (%s): %ld us\n", osd_num, i, osd_op_names[i], avg); - prev_stats.subop_stat_count[i] = c_cli.stats.subop_stat_count[i]; - prev_stats.subop_stat_sum[i] = c_cli.stats.subop_stat_sum[i]; + prev_stats.subop_stat_count[i] = msgr.stats.subop_stat_count[i]; + prev_stats.subop_stat_sum[i] = msgr.stats.subop_stat_sum[i]; } } for (int i = 0; i < 2; i++) @@ -351,7 +368,7 @@ void osd_t::print_slow() char alloc[1024]; timespec now; clock_gettime(CLOCK_REALTIME, &now); - for (auto & kv: c_cli.clients) + for (auto & kv: msgr.clients) { for (auto op: kv.second->received_ops) { diff --git a/src/osd.h b/src/osd.h index 7a67c6e9..c8c5f44b 100644 --- a/src/osd.h +++ b/src/osd.h @@ -92,7 +92,7 @@ class osd_t { // config - blockstore_config_t config; + json11::Json::object config; int etcd_report_interval = 30; bool readonly = false; @@ -104,7 +104,7 @@ class osd_t int bind_port, listen_backlog; // FIXME: Implement client queue depth limit int client_queue_depth = 128; - bool allow_test_ops = true; + bool allow_test_ops = false; int print_stats_interval = 3; int slow_log_interval = 10; int immediate_commit = IMMEDIATE_NONE; @@ -116,7 +116,7 @@ class osd_t // cluster state etcd_state_client_t st_cli; - osd_messenger_t c_cli; + osd_messenger_t msgr; int etcd_failed_attempts = 0; std::string etcd_lease_id; json11::Json self_state; @@ -167,7 +167,7 @@ class osd_t uint64_t recovery_stat_bytes[2][2] = { 0 }; // cluster connection - void parse_config(blockstore_config_t & config); + void parse_config(const json11::Json & config); void init_cluster(); void on_change_osd_state_hook(osd_num_t peer_osd); void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num); @@ -268,7 +268,7 @@ class osd_t } public: - osd_t(blockstore_config_t & config, ring_loop_t *ringloop); + osd_t(const json11::Json & config, ring_loop_t *ringloop); ~osd_t(); void force_stop(int exitcode); bool shutdown(); diff --git a/src/osd_cluster.cpp b/src/osd_cluster.cpp index 7fc5500f..18f38c6e 100644 --- a/src/osd_cluster.cpp +++ b/src/osd_cluster.cpp @@ -21,7 +21,7 @@ void osd_t::init_cluster() { // Test version of clustering code with 1 pool, 1 PG and 2 peers // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205 - std::string peerstr = config["peers"]; + std::string peerstr = config["peers"].string_value(); while (peerstr.size()) { int pos = peerstr.find(','); @@ -104,7 +104,7 @@ void osd_t::parse_test_peer(std::string peer) { "addresses", json11::Json::array { addr } }, { "port", port }, }; - c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); + msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); } json11::Json osd_t::get_osd_state() @@ -146,16 +146,16 @@ json11::Json osd_t::get_statistics() for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++) { op_stats[osd_op_names[i]] = json11::Json::object { - { "count", c_cli.stats.op_stat_count[i] }, - { "usec", c_cli.stats.op_stat_sum[i] }, - { "bytes", c_cli.stats.op_stat_bytes[i] }, + { "count", msgr.stats.op_stat_count[i] }, + { "usec", msgr.stats.op_stat_sum[i] }, + { "bytes", msgr.stats.op_stat_bytes[i] }, }; } for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++) { subop_stats[osd_op_names[i]] = json11::Json::object { - { "count", c_cli.stats.subop_stat_count[i] }, - { "usec", c_cli.stats.subop_stat_sum[i] }, + { "count", msgr.stats.subop_stat_count[i] }, + { "usec", msgr.stats.subop_stat_sum[i] }, }; } st["op_stats"] = op_stats; @@ -298,9 +298,9 @@ void osd_t::report_statistics() void osd_t::on_change_osd_state_hook(osd_num_t peer_osd) { - if (c_cli.wanted_peers.find(peer_osd) != c_cli.wanted_peers.end()) + if (msgr.wanted_peers.find(peer_osd) != msgr.wanted_peers.end()) { - c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); + msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); } } @@ -340,21 +340,10 @@ void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num) void osd_t::on_load_config_hook(json11::Json::object & global_config) { - blockstore_config_t osd_config = this->config; - for (auto & cfg_var: global_config) - { - if (this->config.find(cfg_var.first) == this->config.end()) - { - if (cfg_var.second.is_string()) - { - osd_config[cfg_var.first] = cfg_var.second.string_value(); - } - else - { - osd_config[cfg_var.first] = cfg_var.second.dump(); - } - } - } + json11::Json::object osd_config = this->config; + for (auto & kv: global_config) + if (osd_config.find(kv.first) == osd_config.end()) + osd_config[kv.first] = kv.second; parse_config(osd_config); bind_socket(); acquire_lease(); @@ -380,7 +369,7 @@ void osd_t::acquire_lease() etcd_lease_id = data["ID"].string_value(); create_osd_state(); }); - printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, config["etcd_address"].c_str(), etcd_report_interval); + printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, config["etcd_address"].string_value().c_str(), etcd_report_interval); tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id) { renew_lease(); @@ -695,9 +684,9 @@ void osd_t::apply_pg_config() // Add peers for (auto pg_osd: all_peers) { - if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end()) + if (pg_osd != this->osd_num && msgr.osd_peer_fds.find(pg_osd) == msgr.osd_peer_fds.end()) { - c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); + msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); } } start_pg_peering(pg); diff --git a/src/osd_flush.cpp b/src/osd_flush.cpp index 62ab408f..2217c666 100644 --- a/src/osd_flush.cpp +++ b/src/osd_flush.cpp @@ -82,10 +82,10 @@ void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, p else { printf("Error while doing flush on OSD %lu: %d (%s)\n", osd_num, retval, strerror(-retval)); - auto fd_it = c_cli.osd_peer_fds.find(peer_osd); - if (fd_it != c_cli.osd_peer_fds.end()) + auto fd_it = msgr.osd_peer_fds.find(peer_osd); + if (fd_it != msgr.osd_peer_fds.end()) { - c_cli.stop_client(fd_it->second); + msgr.stop_client(fd_it->second); } return; } @@ -188,7 +188,7 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t else { // Peer - int peer_fd = c_cli.osd_peer_fds[peer_osd]; + int peer_fd = msgr.osd_peer_fds[peer_osd]; op->op_type = OSD_OP_OUT; op->iov.push_back(op->buf, count * sizeof(obj_ver_id)); op->peer_fd = peer_fd; @@ -196,7 +196,7 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t .sec_stab = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = (uint64_t)(rollback ? OSD_OP_SEC_ROLLBACK : OSD_OP_SEC_STABILIZE), }, .len = count * sizeof(obj_ver_id), @@ -207,7 +207,7 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval); delete op; }; - c_cli.outbox_push(op); + msgr.outbox_push(op); } } diff --git a/src/osd_main.cpp b/src/osd_main.cpp index a39326c9..336c69be 100644 --- a/src/osd_main.cpp +++ b/src/osd_main.cpp @@ -29,13 +29,13 @@ int main(int narg, char *args[]) perror("BUG: too small packet size"); return 1; } - blockstore_config_t config; + json11::Json::object config; for (int i = 1; i < narg; i++) { if (args[i][0] == '-' && args[i][1] == '-' && i < narg-1) { char *opt = args[i]+2; - config[opt] = args[++i]; + config[std::string(opt)] = std::string(args[++i]); } } signal(SIGINT, handle_sigint); diff --git a/src/osd_ops.h b/src/osd_ops.h index 76b3ad3a..e8078b71 100644 --- a/src/osd_ops.h +++ b/src/osd_ops.h @@ -148,6 +148,8 @@ struct __attribute__((__packed__)) osd_reply_sec_read_bmp_t struct __attribute__((__packed__)) osd_op_show_config_t { osd_op_header_t header; + // JSON request length + uint64_t json_len; }; struct __attribute__((__packed__)) osd_reply_show_config_t diff --git a/src/osd_peering.cpp b/src/osd_peering.cpp index d8fac206..1d2c14ae 100644 --- a/src/osd_peering.cpp +++ b/src/osd_peering.cpp @@ -156,7 +156,7 @@ void osd_t::start_pg_peering(pg_t & pg) if (immediate_commit != IMMEDIATE_ALL) { std::vector to_stop; - for (auto & cp: c_cli.clients) + for (auto & cp: msgr.clients) { if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end()) { @@ -165,7 +165,7 @@ void osd_t::start_pg_peering(pg_t & pg) } for (auto peer_fd: to_stop) { - c_cli.stop_client(peer_fd); + msgr.stop_client(peer_fd); } } // Calculate current write OSD set @@ -175,7 +175,7 @@ void osd_t::start_pg_peering(pg_t & pg) for (int role = 0; role < pg.target_set.size(); role++) { pg.cur_set[role] = pg.target_set[role] == this->osd_num || - c_cli.osd_peer_fds.find(pg.target_set[role]) != c_cli.osd_peer_fds.end() ? pg.target_set[role] : 0; + msgr.osd_peer_fds.find(pg.target_set[role]) != msgr.osd_peer_fds.end() ? pg.target_set[role] : 0; if (pg.cur_set[role] != 0) { pg.pg_cursize++; @@ -199,7 +199,7 @@ void osd_t::start_pg_peering(pg_t & pg) { found = false; if (history_osd == this->osd_num || - c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end()) + msgr.osd_peer_fds.find(history_osd) != msgr.osd_peer_fds.end()) { found = true; break; @@ -223,13 +223,13 @@ void osd_t::start_pg_peering(pg_t & pg) std::set cur_peers; for (auto pg_osd: pg.all_peers) { - if (pg_osd == this->osd_num || c_cli.osd_peer_fds.find(pg_osd) != c_cli.osd_peer_fds.end()) + if (pg_osd == this->osd_num || msgr.osd_peer_fds.find(pg_osd) != msgr.osd_peer_fds.end()) { cur_peers.insert(pg_osd); } - else if (c_cli.wanted_peers.find(pg_osd) == c_cli.wanted_peers.end()) + else if (msgr.wanted_peers.find(pg_osd) == msgr.wanted_peers.end()) { - c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); + msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); } } pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end()); @@ -325,7 +325,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p else { // Peer - auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]); + auto & cl = msgr.clients.at(msgr.osd_peer_fds[role_osd]); osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; op->peer_fd = cl->peer_fd; @@ -333,7 +333,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p .sec_sync = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_SYNC, }, }, @@ -347,14 +347,14 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p int fail_fd = op->peer_fd; ps->list_ops.erase(role_osd); delete op; - c_cli.stop_client(fail_fd); + msgr.stop_client(fail_fd); return; } delete op; ps->list_ops.erase(role_osd); submit_list_subop(role_osd, ps); }; - c_cli.outbox_push(op); + msgr.outbox_push(op); ps->list_ops[role_osd] = op; } } @@ -404,12 +404,12 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) // Peer osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; - op->peer_fd = c_cli.osd_peer_fds[role_osd]; + op->peer_fd = msgr.osd_peer_fds[role_osd]; op->req = (osd_any_op_t){ .sec_list = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_LIST, }, .list_pg = ps->pg_num, @@ -427,7 +427,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) int fail_fd = op->peer_fd; ps->list_ops.erase(role_osd); delete op; - c_cli.stop_client(fail_fd); + msgr.stop_client(fail_fd); return; } printf( @@ -444,7 +444,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) ps->list_ops.erase(role_osd); delete op; }; - c_cli.outbox_push(op); + msgr.outbox_push(op); ps->list_ops[role_osd] = op; } } diff --git a/src/osd_primary_chain.cpp b/src/osd_primary_chain.cpp index 7d0e6912..e9a2fbfb 100644 --- a/src/osd_primary_chain.cpp +++ b/src/osd_primary_chain.cpp @@ -236,14 +236,14 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg) // Send to a remote OSD osd_op_t *subop = op_data->subops+subop_idx; subop->op_type = OSD_OP_OUT; - subop->peer_fd = c_cli.osd_peer_fds.at(subop_osd_num); + subop->peer_fd = msgr.osd_peer_fds.at(subop_osd_num); // FIXME: Use the pre-allocated buffer subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev)); subop->req = (osd_any_op_t){ .sec_read_bmp = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_READ_BMP, }, .len = sizeof(obj_ver_id)*(i+1-prev), @@ -273,7 +273,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg) } handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(subop); + msgr.outbox_push(subop); subop_idx++; } prev = i+1; diff --git a/src/osd_primary_subops.cpp b/src/osd_primary_subops.cpp index 68f8ae73..9b8eff08 100644 --- a/src/osd_primary_subops.cpp +++ b/src/osd_primary_subops.cpp @@ -87,14 +87,14 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) else { // FIXME add separate magic number for primary ops - auto cl_it = c_cli.clients.find(cur_op->peer_fd); - if (cl_it != c_cli.clients.end()) + auto cl_it = msgr.clients.find(cur_op->peer_fd); + if (cl_it != msgr.clients.end()) { cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; cur_op->reply.hdr.id = cur_op->req.hdr.id; cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; cur_op->reply.hdr.retval = retval; - c_cli.outbox_push(cur_op); + msgr.outbox_push(cur_op); } else { @@ -184,13 +184,13 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o else { subop->op_type = OSD_OP_OUT; - subop->peer_fd = c_cli.osd_peer_fds.at(role_osd_num); + subop->peer_fd = msgr.osd_peer_fds.at(role_osd_num); subop->bitmap = stripes[stripe_num].bmp_buf; subop->bitmap_len = clean_entry_bitmap_size; subop->req.sec_rw = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = (uint64_t)(wr ? (rep ? OSD_OP_SEC_WRITE_STABLE : OSD_OP_SEC_WRITE) : OSD_OP_SEC_READ), }, .oid = { @@ -227,7 +227,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o { handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(subop); + msgr.outbox_push(subop); } i++; } @@ -282,20 +282,20 @@ void osd_t::add_bs_subop_stats(osd_op_t *subop) uint64_t opcode = bs_op_to_osd_op[subop->bs_op->opcode]; timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); - c_cli.stats.op_stat_count[opcode]++; - if (!c_cli.stats.op_stat_count[opcode]) + msgr.stats.op_stat_count[opcode]++; + if (!msgr.stats.op_stat_count[opcode]) { - c_cli.stats.op_stat_count[opcode] = 1; - c_cli.stats.op_stat_sum[opcode] = 0; - c_cli.stats.op_stat_bytes[opcode] = 0; + msgr.stats.op_stat_count[opcode] = 1; + msgr.stats.op_stat_sum[opcode] = 0; + msgr.stats.op_stat_bytes[opcode] = 0; } - c_cli.stats.op_stat_sum[opcode] += ( + msgr.stats.op_stat_sum[opcode] += ( (tv_end.tv_sec - subop->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - subop->tv_begin.tv_nsec)/1000 ); if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE) { - c_cli.stats.op_stat_bytes[opcode] += subop->bs_op->len; + msgr.stats.op_stat_bytes[opcode] += subop->bs_op->len; } } @@ -322,7 +322,7 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) if (subop->peer_fd >= 0) { // Drop connection on any error - c_cli.stop_client(subop->peer_fd); + msgr.stop_client(subop->peer_fd); } } else @@ -332,8 +332,8 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) { uint64_t version = subop->reply.sec_rw.version; #ifdef OSD_DEBUG - uint64_t peer_osd = c_cli.clients.find(subop->peer_fd) != c_cli.clients.end() - ? c_cli.clients[subop->peer_fd]->osd_num : osd_num; + uint64_t peer_osd = msgr.clients.find(subop->peer_fd) != msgr.clients.end() + ? msgr.clients[subop->peer_fd]->osd_num : osd_num; printf("subop %lu from osd %lu: version = %lu\n", opcode, peer_osd, version); #endif if (op_data->fact_ver != UINT64_MAX) @@ -465,11 +465,11 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_ else { subops[i].op_type = OSD_OP_OUT; - subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num); + subops[i].peer_fd = msgr.osd_peer_fds.at(chunk.osd_num); subops[i].req = (osd_any_op_t){ .sec_del = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_DELETE, }, .oid = chunk.oid, @@ -479,7 +479,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_ { handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(&subops[i]); + msgr.outbox_push(&subops[i]); } } } @@ -509,14 +509,14 @@ int osd_t::submit_primary_sync_subops(osd_op_t *cur_op) }); bs->enqueue_op(subops[i].bs_op); } - else if ((peer_it = c_cli.osd_peer_fds.find(sync_osd)) != c_cli.osd_peer_fds.end()) + else if ((peer_it = msgr.osd_peer_fds.find(sync_osd)) != msgr.osd_peer_fds.end()) { subops[i].op_type = OSD_OP_OUT; subops[i].peer_fd = peer_it->second; subops[i].req = (osd_any_op_t){ .sec_sync = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_SYNC, }, } }; @@ -524,7 +524,7 @@ int osd_t::submit_primary_sync_subops(osd_op_t *cur_op) { handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(&subops[i]); + msgr.outbox_push(&subops[i]); } else { @@ -569,11 +569,11 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) else { subops[i].op_type = OSD_OP_OUT; - subops[i].peer_fd = c_cli.osd_peer_fds.at(stab_osd.osd_num); + subops[i].peer_fd = msgr.osd_peer_fds.at(stab_osd.osd_num); subops[i].req = (osd_any_op_t){ .sec_stab = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_STABILIZE, }, .len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)), @@ -583,7 +583,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) { handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(&subops[i]); + msgr.outbox_push(&subops[i]); } } } diff --git a/src/osd_primary_sync.cpp b/src/osd_primary_sync.cpp index 376181eb..3bd06919 100644 --- a/src/osd_primary_sync.cpp +++ b/src/osd_primary_sync.cpp @@ -247,8 +247,8 @@ resume_8: finish: if (cur_op->peer_fd) { - auto it = c_cli.clients.find(cur_op->peer_fd); - if (it != c_cli.clients.end()) + auto it = msgr.clients.find(cur_op->peer_fd); + if (it != msgr.clients.end()) it->second->dirty_pgs.clear(); } finish_op(cur_op, 0); diff --git a/src/osd_primary_write.cpp b/src/osd_primary_write.cpp index 4b76ca9c..d1e08f81 100644 --- a/src/osd_primary_write.cpp +++ b/src/osd_primary_write.cpp @@ -370,8 +370,8 @@ lazy: } // Remember PG as dirty to drop the connection when PG goes offline // (this is required because of the "lazy sync") - auto cl_it = c_cli.clients.find(cur_op->peer_fd); - if (cl_it != c_cli.clients.end()) + auto cl_it = msgr.clients.find(cur_op->peer_fd); + if (cl_it != msgr.clients.end()) { cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); } diff --git a/src/osd_secondary.cpp b/src/osd_secondary.cpp index bb11269a..3487bedc 100644 --- a/src/osd_secondary.cpp +++ b/src/osd_secondary.cpp @@ -144,10 +144,44 @@ void osd_t::exec_secondary(osd_op_t *cur_op) void osd_t::exec_show_config(osd_op_t *cur_op) { - // FIXME: Send the real config, not its source - auto cfg_copy = config; - cfg_copy["protocol_version"] = std::to_string(OSD_PROTOCOL_VERSION); - std::string cfg_str = json11::Json(cfg_copy).dump(); + std::string json_err; + json11::Json req_json = cur_op->req.show_conf.json_len > 0 + ? json11::Json::parse(std::string((char *)cur_op->buf), json_err) + : json11::Json(); + // Expose sensitive configuration values so peers can check them + json11::Json::object wire_config = json11::Json::object { + { "osd_num", osd_num }, + { "protocol_version", OSD_PROTOCOL_VERSION }, + { "block_size", (uint64_t)bs_block_size }, + { "bitmap_granularity", (uint64_t)bs_bitmap_granularity }, + { "primary_enabled", run_primary }, + { "blockstore_enabled", bs ? true : false }, + { "readonly", readonly }, + { "immediate_commit", (immediate_commit == IMMEDIATE_ALL ? "all" : + (immediate_commit == IMMEDIATE_SMALL ? "small" : "none")) }, + { "lease_timeout", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 }, + }; +#ifdef WITH_RDMA + if (msgr.is_rdma_enabled()) + { + // Indicate that RDMA is enabled + wire_config["rdma_enabled"] = true; + if (req_json["connect_rdma"].is_string()) + { + // Peer is trying to connect using RDMA, try to satisfy him + bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value(), req_json["rdma_max_msg"].uint64_value()); + if (ok) + { + auto rc = msgr.clients.at(cur_op->peer_fd)->rdma_conn; + wire_config["rdma_address"] = rc->addr.to_string(); + wire_config["rdma_max_msg"] = rc->max_msg; + } + } + } +#endif + if (cur_op->buf) + free(cur_op->buf); + std::string cfg_str = json11::Json(wire_config).dump(); cur_op->buf = malloc_or_die(cfg_str.size()+1); memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1); cur_op->iov.push_back(cur_op->buf, cfg_str.size()+1); diff --git a/src/qemu_driver.c b/src/qemu_driver.c index 5f7865ec..07944878 100644 --- a/src/qemu_driver.c +++ b/src/qemu_driver.c @@ -40,6 +40,7 @@ typedef struct VitastorClient { void *proxy; void *watch; + char *config_path; char *etcd_host; char *etcd_prefix; char *image; @@ -47,6 +48,10 @@ typedef struct VitastorClient uint64_t pool; uint64_t size; long readonly; + char *rdma_device; + int rdma_port_num; + int rdma_gid_index; + int rdma_mtu; QemuMutex mutex; } VitastorClient; @@ -95,7 +100,8 @@ static void qemu_rbd_unescape(char *src) } // vitastor[:key=value]* -// vitastor:etcd_host=127.0.0.1:inode=1:pool=1 +// vitastor[:etcd_host=127.0.0.1]:inode=1:pool=1[:rdma_gid_index=3] +// vitastor:config_path=/etc/vitastor/vitastor.conf:image=testimg static void vitastor_parse_filename(const char *filename, QDict *options, Error **errp) { const char *start; @@ -123,7 +129,12 @@ static void vitastor_parse_filename(const char *filename, QDict *options, Error qemu_rbd_unescape(name); value = qemu_rbd_next_tok(p, ':', &p); qemu_rbd_unescape(value); - if (!strcmp(name, "inode") || !strcmp(name, "pool") || !strcmp(name, "size")) + if (!strcmp(name, "inode") || + !strcmp(name, "pool") || + !strcmp(name, "size") || + !strcmp(name, "rdma_port_num") || + !strcmp(name, "rdma_gid_index") || + !strcmp(name, "rdma_mtu")) { unsigned long long num_val; if (parse_uint_full(value, &num_val, 0)) @@ -157,11 +168,6 @@ static void vitastor_parse_filename(const char *filename, QDict *options, Error goto out; } } - if (!qdict_get_str(options, "etcd_host")) - { - error_setg(errp, "etcd_host is missing"); - goto out; - } out: g_free(buf); @@ -189,9 +195,17 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E VitastorClient *client = bs->opaque; int64_t ret = 0; qemu_mutex_init(&client->mutex); + client->config_path = g_strdup(qdict_get_try_str(options, "config_path")); client->etcd_host = g_strdup(qdict_get_try_str(options, "etcd_host")); client->etcd_prefix = g_strdup(qdict_get_try_str(options, "etcd_prefix")); - client->proxy = vitastor_proxy_create(bdrv_get_aio_context(bs), client->etcd_host, client->etcd_prefix); + client->rdma_device = g_strdup(qdict_get_try_str(options, "rdma_device")); + client->rdma_port_num = qdict_get_try_int(options, "rdma_port_num", 0); + client->rdma_gid_index = qdict_get_try_int(options, "rdma_gid_index", 0); + client->rdma_mtu = qdict_get_try_int(options, "rdma_mtu", 0); + client->proxy = vitastor_proxy_create( + bdrv_get_aio_context(bs), client->config_path, client->etcd_host, client->etcd_prefix, + client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu + ); client->image = g_strdup(qdict_get_try_str(options, "image")); client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0; if (client->image) @@ -241,6 +255,11 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E } bs->total_sectors = client->size / BDRV_SECTOR_SIZE; //client->aio_context = bdrv_get_aio_context(bs); + qdict_del(options, "rdma_mtu"); + qdict_del(options, "rdma_gid_index"); + qdict_del(options, "rdma_port_num"); + qdict_del(options, "rdma_device"); + qdict_del(options, "config_path"); qdict_del(options, "etcd_host"); qdict_del(options, "etcd_prefix"); qdict_del(options, "image"); @@ -255,7 +274,10 @@ static void vitastor_close(BlockDriverState *bs) VitastorClient *client = bs->opaque; vitastor_proxy_destroy(client->proxy); qemu_mutex_destroy(&client->mutex); - g_free(client->etcd_host); + if (client->config_path) + g_free(client->config_path); + if (client->etcd_host) + g_free(client->etcd_host); if (client->etcd_prefix) g_free(client->etcd_prefix); if (client->image) @@ -478,6 +500,7 @@ static QEMUOptionParameter vitastor_create_opts[] = { static const char *vitastor_strong_runtime_opts[] = { "inode", "pool", + "config_path", "etcd_host", "etcd_prefix", diff --git a/src/qemu_proxy.cpp b/src/qemu_proxy.cpp index e65ce72f..28bebc45 100644 --- a/src/qemu_proxy.cpp +++ b/src/qemu_proxy.cpp @@ -34,15 +34,28 @@ public: cluster_client_t *cli; AioContext *ctx; - QemuProxy(AioContext *ctx, const char *etcd_host, const char *etcd_prefix) + QemuProxy(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix, + const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu) { this->ctx = ctx; - json11::Json cfg = json11::Json::object { - { "etcd_address", std::string(etcd_host) }, - { "etcd_prefix", std::string(etcd_prefix ? etcd_prefix : "/vitastor") }, - }; + json11::Json::object cfg; + if (config_path) + cfg["config_path"] = std::string(config_path); + if (etcd_host) + cfg["etcd_address"] = std::string(etcd_host); + if (etcd_prefix) + cfg["etcd_prefix"] = std::string(etcd_prefix); + if (rdma_device) + cfg["rdma_device"] = std::string(rdma_device); + if (rdma_port_num) + cfg["rdma_port_num"] = rdma_port_num; + if (rdma_gid_index) + cfg["rdma_gid_index"] = rdma_gid_index; + if (rdma_mtu) + cfg["rdma_mtu"] = rdma_mtu; + json11::Json cfg_json(cfg); tfd = new timerfd_manager_t([this](int fd, bool wr, std::function callback) { set_fd_handler(fd, wr, callback); }); - cli = new cluster_client_t(NULL, tfd, cfg); + cli = new cluster_client_t(NULL, tfd, cfg_json); } ~QemuProxy() @@ -80,9 +93,10 @@ public: extern "C" { -void* vitastor_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix) +void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix, + const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu) { - QemuProxy *p = new QemuProxy(ctx, etcd_host, etcd_prefix); + QemuProxy *p = new QemuProxy(ctx, config_path, etcd_host, etcd_prefix, rdma_device, rdma_port_num, rdma_gid_index, rdma_mtu); return p; } diff --git a/src/qemu_proxy.h b/src/qemu_proxy.h index 148f7d38..7940c819 100644 --- a/src/qemu_proxy.h +++ b/src/qemu_proxy.h @@ -16,7 +16,8 @@ extern "C" { // Our exports typedef void VitastorIOHandler(long retval, void *opaque); -void* vitastor_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix); +void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix, + const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu); void vitastor_proxy_destroy(void *client); void vitastor_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len, struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque); diff --git a/src/rm_inode.cpp b/src/rm_inode.cpp index 6cae5c7f..cfc3132c 100644 --- a/src/rm_inode.cpp +++ b/src/rm_inode.cpp @@ -87,7 +87,7 @@ public: "Vitastor inode removal tool\n" "(c) Vitaliy Filippov, 2020 (VNPL-1.1)\n\n" "USAGE:\n" - " %s --etcd_address --pool --inode [--wait-list]\n", + " %s [--etcd_address ] --pool --inode [--wait-list]\n", exe_name ); exit(0); @@ -95,11 +95,6 @@ public: void run(json11::Json cfg) { - if (cfg["etcd_address"].string_value() == "") - { - fprintf(stderr, "etcd_address is missing\n"); - exit(1); - } inode = cfg["inode"].uint64_value(); pool_id = cfg["pool"].uint64_value(); if (pool_id) diff --git a/tests/test_vm_start.sh b/tests/test_vm_start.sh index bdcdf29c..f2b1f3e2 100755 --- a/tests/test_vm_start.sh +++ b/tests/test_vm_start.sh @@ -46,7 +46,7 @@ $ETCDCTL put /vitastor/config/inode/1/1 '{"name":"debian9@0","size":'$((2048*102 $ETCDCTL put /vitastor/config/inode/1/2 '{"parent_id":1,"name":"debian9","size":'$((2048*1024*1024))'}' qemu-system-x86_64 -enable-kvm -m 1024 \ - -drive 'file=vitastor:etcd_host=127.0.0.1\:$ETCD_PORT/v3:image=debian9',format=raw,if=none,id=drive-virtio-disk0,cache=none \ + -drive 'file=vitastor:etcd_host=127.0.0.1\:'$ETCD_PORT'/v3:image=debian9',format=raw,if=none,id=drive-virtio-disk0,cache=none \ -device virtio-blk-pci,scsi=off,bus=pci.0,addr=0x5,drive=drive-virtio-disk0,id=virtio-disk0,bootindex=1,write-cache=off,physical_block_size=4096,logical_block_size=512 \ -vnc 0.0.0.0:0