Compare commits

..

1 Commits

Author SHA1 Message Date
64bbf121b6 Experiment: zero-copy TCP send
Some checks failed
Test / test_minsize_1 (push) Successful in 13s
Test / test_snapshot_ec (push) Successful in 32s
Test / test_move_reappear (push) Successful in 20s
Test / test_rm (push) Successful in 17s
Test / test_snapshot_down (push) Successful in 29s
Test / test_snapshot_down_ec (push) Successful in 31s
Test / test_splitbrain (push) Successful in 25s
Test / test_snapshot_chain (push) Successful in 2m32s
Test / test_snapshot_chain_ec (push) Failing after 3m5s
Test / test_rebalance_verify_imm (push) Successful in 2m52s
Test / test_write (push) Successful in 33s
Test / test_rebalance_verify (push) Successful in 3m48s
Test / test_write_no_same (push) Successful in 13s
Test / test_rebalance_verify_ec_imm (push) Successful in 3m11s
Test / test_rebalance_verify_ec (push) Successful in 4m11s
Test / test_write_xor (push) Failing after 3m10s
Test / test_heal_pg_size_2 (push) Successful in 3m50s
Test / test_heal_csum_32k_dmj (push) Successful in 5m15s
Test / test_heal_ec (push) Successful in 6m34s
Test / test_heal_csum_32k_dj (push) Successful in 6m19s
Test / test_heal_csum_32k (push) Successful in 6m26s
Test / test_scrub (push) Successful in 1m16s
Test / test_scrub_zero_osd_2 (push) Successful in 1m15s
Test / test_scrub_xor (push) Successful in 1m24s
Test / test_heal_csum_4k_dmj (push) Successful in 7m0s
Test / test_heal_csum_4k_dj (push) Successful in 6m18s
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 1m4s
Test / test_heal_csum_4k (push) Successful in 6m8s
Test / test_scrub_ec (push) Successful in 57s
Test / test_scrub_pg_size_3 (push) Successful in 1m17s
2023-11-04 01:34:18 +03:00
63 changed files with 355 additions and 3796 deletions

View File

@@ -2,6 +2,6 @@ cmake_minimum_required(VERSION 2.8.12)
project(vitastor)
set(VERSION "1.2.0")
set(VERSION "1.1.0")
add_subdirectory(src)

View File

@@ -1,4 +1,4 @@
VERSION ?= v1.2.0
VERSION ?= v1.1.0
all: build push

View File

@@ -49,7 +49,7 @@ spec:
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: vitalif/vitastor-csi:v1.2.0
image: vitalif/vitastor-csi:v1.1.0
args:
- "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -121,7 +121,7 @@ spec:
privileged: true
capabilities:
add: ["SYS_ADMIN"]
image: vitalif/vitastor-csi:v1.2.0
image: vitalif/vitastor-csi:v1.1.0
args:
- "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -5,7 +5,7 @@ package vitastor
const (
vitastorCSIDriverName = "csi.vitastor.io"
vitastorCSIDriverVersion = "1.2.0"
vitastorCSIDriverVersion = "1.1.0"
)
// Config struct fills the parameters of request or user input

4
debian/changelog vendored
View File

@@ -1,10 +1,10 @@
vitastor (1.2.0-1) unstable; urgency=medium
vitastor (1.1.0-1) unstable; urgency=medium
* Bugfixes
-- Vitaliy Filippov <vitalif@yourcmc.ru> Fri, 03 Jun 2022 02:09:44 +0300
vitastor (1.2.0-1) unstable; urgency=medium
vitastor (1.1.0-1) unstable; urgency=medium
* Implement NFS proxy
* Add documentation

View File

@@ -35,8 +35,8 @@ RUN set -e -x; \
mkdir -p /root/packages/vitastor-$REL; \
rm -rf /root/packages/vitastor-$REL/*; \
cd /root/packages/vitastor-$REL; \
cp -r /root/vitastor vitastor-1.2.0; \
cd vitastor-1.2.0; \
cp -r /root/vitastor vitastor-1.1.0; \
cd vitastor-1.1.0; \
ln -s /root/fio-build/fio-*/ ./fio; \
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
ls /usr/include/linux/raw.h || cp ./debian/raw.h /usr/include/linux/raw.h; \
@@ -49,8 +49,8 @@ RUN set -e -x; \
rm -rf a b; \
echo "dep:fio=$FIO" > debian/fio_version; \
cd /root/packages/vitastor-$REL; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_1.2.0.orig.tar.xz vitastor-1.2.0; \
cd vitastor-1.2.0; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_1.1.0.orig.tar.xz vitastor-1.1.0; \
cd vitastor-1.1.0; \
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \

View File

@@ -20,7 +20,6 @@ between clients, OSDs and etcd.
- [rdma_max_msg](#rdma_max_msg)
- [rdma_max_recv](#rdma_max_recv)
- [rdma_max_send](#rdma_max_send)
- [rdma_odp](#rdma_odp)
- [peer_connect_interval](#peer_connect_interval)
- [peer_connect_timeout](#peer_connect_timeout)
- [osd_idle_timeout](#osd_idle_timeout)
@@ -69,14 +68,11 @@ but they are not connected to the cluster.
- Type: string
RDMA device name to use for Vitastor OSD communications (for example,
"rocep5s0f0"). Now Vitastor supports all adapters, even ones without
ODP support, like Mellanox ConnectX-3 and non-Mellanox cards.
Versions up to Vitastor 1.2.0 required ODP which is only present in
Mellanox ConnectX >= 4. See also [rdma_odp](#rdma_odp).
Run `ibv_devinfo -v` as root to list available RDMA devices and their
features.
"rocep5s0f0"). Please note that Vitastor RDMA requires Implicit On-Demand
Paging (Implicit ODP) and Scatter/Gather (SG) support from the RDMA device
to work. For example, Mellanox ConnectX-3 and older adapters don't have
Implicit ODP, so they're unsupported by Vitastor. Run `ibv_devinfo -v` as
root to list available RDMA devices and their features.
Remember that you also have to configure your network switches if you use
RoCE/RoCEv2, otherwise you may experience unstable performance. Refer to
@@ -151,28 +147,6 @@ less than `rdma_max_recv` so the receiving side doesn't run out of buffers.
Doesn't affect memory usage - additional memory isn't allocated for send
operations.
## rdma_odp
- Type: boolean
- Default: false
Use RDMA with On-Demand Paging. ODP is currently only available on Mellanox
ConnectX-4 and newer adapters. ODP allows to not register memory explicitly
for RDMA adapter to be able to use it. This, in turn, allows to skip memory
copying during sending. One would think this should improve performance, but
**in reality** RDMA performance with ODP is **drastically** worse. Example
3-node cluster with 8 NVMe in each node and 2*25 GBit/s ConnectX-6 RDMA network
without ODP pushes 3950000 read iops, but only 239000 iops with ODP...
This happens because Mellanox ODP implementation seems to be based on
message retransmissions when the adapter doesn't know about the buffer yet -
it likely uses standard "RNR retransmissions" (RNR = receiver not ready)
which is generally slow in RDMA/RoCE networks. Here's a presentation about
it from ISPASS-2021 conference: https://tkygtr6.github.io/pub/ISPASS21_slides.pdf
ODP support is retained in the code just in case a good ODP implementation
appears one day.
## peer_connect_interval
- Type: seconds

View File

@@ -20,7 +20,6 @@
- [rdma_max_msg](#rdma_max_msg)
- [rdma_max_recv](#rdma_max_recv)
- [rdma_max_send](#rdma_max_send)
- [rdma_odp](#rdma_odp)
- [peer_connect_interval](#peer_connect_interval)
- [peer_connect_timeout](#peer_connect_timeout)
- [osd_idle_timeout](#osd_idle_timeout)
@@ -72,15 +71,12 @@ RDMA может быть нужно только если у клиентов е
- Тип: строка
Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0").
Сейчас Vitastor поддерживает все модели адаптеров, включая те, у которых
нет поддержки ODP, то есть вы можете использовать RDMA с ConnectX-3 и
картами производства не Mellanox.
Версии Vitastor до 1.2.0 включительно требовали ODP, который есть только
на Mellanox ConnectX 4 и более новых. См. также [rdma_odp](#rdma_odp).
Запустите `ibv_devinfo -v` от имени суперпользователя, чтобы посмотреть
список доступных RDMA-устройств, их параметры и возможности.
Имейте в виду, что поддержка RDMA в Vitastor требует функций устройства
Implicit On-Demand Paging (Implicit ODP) и Scatter/Gather (SG). Например,
адаптеры Mellanox ConnectX-3 и более старые не поддерживают Implicit ODP и
потому не поддерживаются в Vitastor. Запустите `ibv_devinfo -v` от имени
суперпользователя, чтобы посмотреть список доступных RDMA-устройств, их
параметры и возможности.
Обратите внимание, что если вы используете RoCE/RoCEv2, вам также необходимо
правильно настроить для него коммутаторы, иначе вы можете столкнуться с
@@ -159,29 +155,6 @@ OSD в любом случае согласовывают реальное зн
Не влияет на потребление памяти - дополнительная память на операции отправки
не выделяется.
## rdma_odp
- Тип: булево (да/нет)
- Значение по умолчанию: false
Использовать RDMA с On-Demand Paging. ODP - функция, доступная пока что
исключительно на адаптерах Mellanox ConnectX-4 и более новых. ODP позволяет
не регистрировать память для её использования RDMA-картой. Благодаря этому
можно не копировать данные при отправке их в сеть и, казалось бы, это должно
улучшать производительность - но **по факту** получается так, что
производительность только ухудшается, причём сильно. Пример - на 3-узловом
кластере с 8 NVMe в каждом узле и сетью 2*25 Гбит/с на чтение с RDMA без ODP
удаётся снять 3950000 iops, а с ODP - всего 239000 iops...
Это происходит из-за того, что реализация ODP у Mellanox неоптимальная и
основана на повторной передаче сообщений, когда карте не известен буфер -
вероятно, на стандартных "RNR retransmission" (RNR = receiver not ready).
А данные повторные передачи в RDMA/RoCE - всегда очень медленная штука.
Презентация на эту тему с конференции ISPASS-2021: https://tkygtr6.github.io/pub/ISPASS21_slides.pdf
Возможность использования ODP сохранена в коде на случай, если вдруг в один
прекрасный день появится хорошая реализация ODP.
## peer_connect_interval
- Тип: секунды

View File

@@ -30,6 +30,18 @@
будут использоваться обычные синхронные системные вызовы send/recv. Для OSD
это бессмысленно, так как OSD в любом случае нуждается в io_uring, но, в
принципе, это может применяться для клиентов со старыми версиями ядра.
- name: use_zerocopy_send
type: bool
default: false
info: |
If true, OSDs and clients will attempt to use TCP zero-copy send
(MSG_ZEROCOPY) for big buffers. It's recommended to raise net.ipv4.tcp_wmem
and net.core.wmem_max sysctls when using this mode.
info_ru: |
Если установлено в true, то OSD и клиенты будут стараться использовать
TCP-отправку без копирования (MSG_ZEROCOPY) для больших буферов данных.
Рекомендуется поднять значения sysctl net.ipv4.tcp_wmem и net.core.wmem_max
при использовании этого режима.
- name: use_rdma
type: bool
default: true
@@ -48,14 +60,11 @@
type: string
info: |
RDMA device name to use for Vitastor OSD communications (for example,
"rocep5s0f0"). Now Vitastor supports all adapters, even ones without
ODP support, like Mellanox ConnectX-3 and non-Mellanox cards.
Versions up to Vitastor 1.2.0 required ODP which is only present in
Mellanox ConnectX >= 4. See also [rdma_odp](#rdma_odp).
Run `ibv_devinfo -v` as root to list available RDMA devices and their
features.
"rocep5s0f0"). Please note that Vitastor RDMA requires Implicit On-Demand
Paging (Implicit ODP) and Scatter/Gather (SG) support from the RDMA device
to work. For example, Mellanox ConnectX-3 and older adapters don't have
Implicit ODP, so they're unsupported by Vitastor. Run `ibv_devinfo -v` as
root to list available RDMA devices and their features.
Remember that you also have to configure your network switches if you use
RoCE/RoCEv2, otherwise you may experience unstable performance. Refer to
@@ -64,15 +73,12 @@
PFC (Priority Flow Control) and ECN (Explicit Congestion Notification).
info_ru: |
Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0").
Сейчас Vitastor поддерживает все модели адаптеров, включая те, у которых
нет поддержки ODP, то есть вы можете использовать RDMA с ConnectX-3 и
картами производства не Mellanox.
Версии Vitastor до 1.2.0 включительно требовали ODP, который есть только
на Mellanox ConnectX 4 и более новых. См. также [rdma_odp](#rdma_odp).
Запустите `ibv_devinfo -v` от имени суперпользователя, чтобы посмотреть
список доступных RDMA-устройств, их параметры и возможности.
Имейте в виду, что поддержка RDMA в Vitastor требует функций устройства
Implicit On-Demand Paging (Implicit ODP) и Scatter/Gather (SG). Например,
адаптеры Mellanox ConnectX-3 и более старые не поддерживают Implicit ODP и
потому не поддерживаются в Vitastor. Запустите `ibv_devinfo -v` от имени
суперпользователя, чтобы посмотреть список доступных RDMA-устройств, их
параметры и возможности.
Обратите внимание, что если вы используете RoCE/RoCEv2, вам также необходимо
правильно настроить для него коммутаторы, иначе вы можете столкнуться с
@@ -166,45 +172,6 @@
у принимающей стороны в процессе работы не заканчивались буферы на приём.
Не влияет на потребление памяти - дополнительная память на операции отправки
не выделяется.
- name: rdma_odp
type: bool
default: false
online: false
info: |
Use RDMA with On-Demand Paging. ODP is currently only available on Mellanox
ConnectX-4 and newer adapters. ODP allows to not register memory explicitly
for RDMA adapter to be able to use it. This, in turn, allows to skip memory
copying during sending. One would think this should improve performance, but
**in reality** RDMA performance with ODP is **drastically** worse. Example
3-node cluster with 8 NVMe in each node and 2*25 GBit/s ConnectX-6 RDMA network
without ODP pushes 3950000 read iops, but only 239000 iops with ODP...
This happens because Mellanox ODP implementation seems to be based on
message retransmissions when the adapter doesn't know about the buffer yet -
it likely uses standard "RNR retransmissions" (RNR = receiver not ready)
which is generally slow in RDMA/RoCE networks. Here's a presentation about
it from ISPASS-2021 conference: https://tkygtr6.github.io/pub/ISPASS21_slides.pdf
ODP support is retained in the code just in case a good ODP implementation
appears one day.
info_ru: |
Использовать RDMA с On-Demand Paging. ODP - функция, доступная пока что
исключительно на адаптерах Mellanox ConnectX-4 и более новых. ODP позволяет
не регистрировать память для её использования RDMA-картой. Благодаря этому
можно не копировать данные при отправке их в сеть и, казалось бы, это должно
улучшать производительность - но **по факту** получается так, что
производительность только ухудшается, причём сильно. Пример - на 3-узловом
кластере с 8 NVMe в каждом узле и сетью 2*25 Гбит/с на чтение с RDMA без ODP
удаётся снять 3950000 iops, а с ODP - всего 239000 iops...
Это происходит из-за того, что реализация ODP у Mellanox неоптимальная и
основана на повторной передаче сообщений, когда карте не известен буфер -
вероятно, на стандартных "RNR retransmission" (RNR = receiver not ready).
А данные повторные передачи в RDMA/RoCE - всегда очень медленная штука.
Презентация на эту тему с конференции ISPASS-2021: https://tkygtr6.github.io/pub/ISPASS21_slides.pdf
Возможность использования ODP сохранена в коде на случай, если вдруг в один
прекрасный день появится хорошая реализация ODP.
- name: peer_connect_interval
type: sec
min: 1

View File

@@ -17,15 +17,4 @@ and apply all `NNN-*.yaml` manifests to your Kubernetes installation:
for i in ./???-*.yaml; do kubectl apply -f $i; done
```
After that you'll be able to create PersistentVolumes.
## Features
Vitastor CSI supports:
- Kubernetes starting with 1.20 (or 1.17 for older vitastor-csi <= 1.1.0)
- Filesystem RWO (ReadWriteOnce) volumes. Example: [PVC](../../csi/deploy/example-pvc.yaml), [pod](../../csi/deploy/example-test-pod.yaml)
- Raw block RWX (ReadWriteMany) volumes. Example: [PVC](../../csi/deploy/example-pvc-block.yaml), [pod](../../csi/deploy/example-test-pod-block.yaml)
- Volume expansion
- Volume snapshots. Example: [snapshot class](../../csi/deploy/example-snapshot-class.yaml), [snapshot](../../csi/deploy/example-snapshot.yaml), [clone](../../csi/deploy/example-snapshot-clone.yaml)
Remember that to use snapshots with CSI you also have to install [Snapshot Controller and CRDs](https://kubernetes-csi.github.io/docs/snapshot-controller.html#deployment).
After that you'll be able to create PersistentVolumes. See example in [csi/deploy/example-pvc.yaml](../../csi/deploy/example-pvc.yaml).

View File

@@ -17,15 +17,4 @@
for i in ./???-*.yaml; do kubectl apply -f $i; done
```
После этого вы сможете создавать PersistentVolume.
## Возможности
CSI-плагин Vitastor поддерживает:
- Версии Kubernetes, начиная с 1.20 (или с 1.17 для более старых vitastor-csi <= 1.1.0)
- Файловые RWO (ReadWriteOnce) тома. Пример: [PVC](../../csi/deploy/example-pvc.yaml), [под](../../csi/deploy/example-test-pod.yaml)
- Сырые блочные RWX (ReadWriteMany) тома. Пример: [PVC](../../csi/deploy/example-pvc-block.yaml), [под](../../csi/deploy/example-test-pod-block.yaml)
- Расширение размера томов
- Снимки томов. Пример: [класс снимков](../../csi/deploy/example-snapshot-class.yaml), [снимок](../../csi/deploy/example-snapshot.yaml), [клон снимка](../../csi/deploy/example-snapshot-clone.yaml)
Не забывайте, что для использования снимков нужно сначала установить [контроллер снимков и CRD](https://kubernetes-csi.github.io/docs/snapshot-controller.html#deployment).
После этого вы сможете создавать PersistentVolume. Пример смотрите в файле [csi/deploy/example-pvc.yaml](../../csi/deploy/example-pvc.yaml).

View File

@@ -127,46 +127,19 @@ Linux kernel, starting with version 5.15, supports a new interface for attaching
to the host - VDUSE (vDPA Device in Userspace). QEMU, starting with 7.2, has support for
exporting QEMU block devices over this protocol using qemu-storage-daemon.
VDUSE is currently the best interface to attach Vitastor disks as kernel devices because:
- It avoids data copies and thus achieves much better performance than [NBD](nbd.en.md)
- It doesn't have NBD timeout problem - the device doesn't die if an operation executes for too long
- It doesn't have hung device problem - if the userspace process dies it can be restarted (!)
and block device will continue operation
- It doesn't seem to have the device number limit
VDUSE has the same problem as other FUSE-like interfaces in Linux: if a userspace process hangs,
for example, if it loses connectivity with Vitastor cluster - active processes doing I/O may
hang in the D state (uninterruptible sleep) and you won't be able to kill them even with kill -9.
In this case reboot will be the only way to remove VDUSE devices from system.
Example performance comparison:
| | direct fio | NBD | VDUSE |
|----------------------|-------------|-------------|-------------|
| linear write | 3.85 GB/s | 1.12 GB/s | 3.85 GB/s |
| 4k random write Q128 | 240000 iops | 120000 iops | 178000 iops |
| 4k random write Q1 | 9500 iops | 7620 iops | 7640 iops |
| linear read | 4.3 GB/s | 1.8 GB/s | 2.85 GB/s |
| 4k random read Q128 | 287000 iops | 140000 iops | 189000 iops |
| 4k random read Q1 | 9600 iops | 7640 iops | 7780 iops |
On the other hand, VDUSE is faster than [NBD](nbd.en.md), so you may prefer to use it if
performance is important for you. Approximate performance numbers:
direct fio benchmark - 115000 iops, NBD - 60000 iops, VDUSE - 90000 iops.
To try VDUSE you need at least Linux 5.15, built with VDUSE support
(CONFIG_VIRTIO_VDPA=m, CONFIG_VDPA_USER=m, CONFIG_VIRTIO_VDPA=m).
Debian Linux kernels have these options disabled by now, so if you want to try it on Debian,
use a kernel from Ubuntu [kernel-ppa/mainline](https://kernel.ubuntu.com/~kernel-ppa/mainline/), Proxmox,
or build modules for Debian kernel manually:
```
mkdir build
cd build
apt-get install linux-headers-`uname -r`
apt-get build-dep linux-image-`uname -r`-unsigned
apt-get source linux-image-`uname -r`-unsigned
cd linux*/drivers/vdpa
make -C /lib/modules/`uname -r`/build M=$PWD CONFIG_VDPA=m CONFIG_VDPA_USER=m CONFIG_VIRTIO_VDPA=m -j8 modules modules_install
cat Module.symvers >> /lib/modules/`uname -r`/build/Module.symvers
cd ../virtio
make -C /lib/modules/`uname -r`/build M=$PWD CONFIG_VDPA=m CONFIG_VDPA_USER=m CONFIG_VIRTIO_VDPA=m -j8 modules modules_install
depmod -a
```
You also need `vdpa` tool from the `iproute2` package.
(CONFIG_VIRTIO_VDPA=m and CONFIG_VDPA_USER=m). Debian Linux kernels have these options
disabled by now, so if you want to try it on Debian, use a kernel from Ubuntu
[kernel-ppa/mainline](https://kernel.ubuntu.com/~kernel-ppa/mainline/) or Proxmox.
Commands to attach Vitastor image as a VDUSE device:
@@ -179,7 +152,7 @@ qemu-storage-daemon --daemonize --blockdev '{"node-name":"test1","driver":"vitas
vdpa dev add name test1 mgmtdev vduse
```
After running these commands, `/dev/vda` device will appear in the system and you'll be able to
After running these commands /dev/vda device will appear in the system and you'll be able to
use it as a normal disk.
To remove the device:

View File

@@ -129,47 +129,19 @@ qemu-system-x86_64 -enable-kvm -m 2048 -M accel=kvm,memory-backend=mem \
к системе - VDUSE (vDPA Device in Userspace), а в QEMU, начиная с версии 7.2, есть поддержка
экспорта блочных устройств QEMU по этому протоколу через qemu-storage-daemon.
VDUSE - на данный момент лучший интерфейс для подключения дисков Vitastor в виде блочных
устройств на уровне ядра, ибо:
- VDUSE не копирует данные и поэтому достигает значительно лучшей производительности, чем [NBD](nbd.ru.md)
- Также оно не имеет проблемы NBD-таймаута - устройство не умирает, если операция выполняется слишком долго
- Также оно не имеет проблемы подвисающих устройств - если процесс-обработчик умирает, его можно
перезапустить (!) и блочное устройство продолжит работать
- По-видимому, у него нет предела числа подключаемых в систему устройств
VDUSE страдает общей проблемой FUSE-подобных интерфейсов в Linux: если пользовательский процесс
подвиснет, например, если будет потеряна связь с кластером Vitastor - читающие/пишущие в кластер
процессы могут "залипнуть" в состоянии D (непрерываемый сон) и их будет невозможно убить даже
через kill -9. В этом случае удалить из системы устройство можно только перезагрузившись.
Пример сравнения производительности:
С другой стороны, VDUSE быстрее по сравнению с [NBD](nbd.ru.md), поэтому его может
быть предпочтительно использовать там, где производительность важнее. Порядок показателей:
прямое тестирование через fio - 115000 iops, NBD - 60000 iops, VDUSE - 90000 iops.
| | Прямой fio | NBD | VDUSE |
|--------------------------|-------------|-------------|-------------|
| линейная запись | 3.85 GB/s | 1.12 GB/s | 3.85 GB/s |
| 4k случайная запись Q128 | 240000 iops | 120000 iops | 178000 iops |
| 4k случайная запись Q1 | 9500 iops | 7620 iops | 7640 iops |
| линейное чтение | 4.3 GB/s | 1.8 GB/s | 2.85 GB/s |
| 4k случайное чтение Q128 | 287000 iops | 140000 iops | 189000 iops |
| 4k случайное чтение Q1 | 9600 iops | 7640 iops | 7780 iops |
Чтобы попробовать VDUSE, вам нужно ядро Linux как минимум версии 5.15, собранное с поддержкой
VDUSE (CONFIG_VIRTIO_VDPA=m, CONFIG_VDPA_USER=m, CONFIG_VIRTIO_VDPA=m).
В ядрах в Debian Linux поддержка пока отключена по умолчанию, так что чтобы попробовать VDUSE
на Debian, поставьте ядро из Ubuntu [kernel-ppa/mainline](https://kernel.ubuntu.com/~kernel-ppa/mainline/),
из Proxmox или соберите модули для ядра Debian вручную:
```
mkdir build
cd build
apt-get install linux-headers-`uname -r`
apt-get build-dep linux-image-`uname -r`-unsigned
apt-get source linux-image-`uname -r`-unsigned
cd linux*/drivers/vdpa
make -C /lib/modules/`uname -r`/build M=$PWD CONFIG_VDPA=m CONFIG_VDPA_USER=m CONFIG_VIRTIO_VDPA=m -j8 modules modules_install
cat Module.symvers >> /lib/modules/`uname -r`/build/Module.symvers
cd ../virtio
make -C /lib/modules/`uname -r`/build M=$PWD CONFIG_VDPA=m CONFIG_VDPA_USER=m CONFIG_VIRTIO_VDPA=m -j8 modules modules_install
depmod -a
```
Также вам понадобится консольная утилита `vdpa` из пакета `iproute2`.
Чтобы использовать VDUSE, вам нужно ядро Linux версии хотя бы 5.15, собранное с поддержкой
VDUSE (CONFIG_VIRTIO_VDPA=m и CONFIG_VDPA_USER=m). В ядрах в Debian Linux поддержка пока
отключена - если хотите попробовать эту функцию на Debian, поставьте ядро из Ubuntu
[kernel-ppa/mainline](https://kernel.ubuntu.com/~kernel-ppa/mainline/) или из Proxmox.
Команды для подключения виртуального диска через VDUSE:
@@ -182,7 +154,7 @@ qemu-storage-daemon --daemonize --blockdev '{"node-name":"test1","driver":"vitas
vdpa dev add name test1 mgmtdev vduse
```
После этого в системе появится устройство `/dev/vda`, которое можно будет использовать как
После этого в системе появится устройство /dev/vda, которое можно будет использовать как
обычный диск.
Для удаления устройства из системы:

2
json11

Submodule json11 updated: fd37016cf8...52a3af664f

View File

@@ -65,6 +65,7 @@ const etcd_tree = {
// client and osd
tcp_header_buffer_size: 65536,
use_sync_send_recv: false,
use_zerocopy_send: false,
use_rdma: true,
rdma_device: null, // for example, "rocep5s0f0"
rdma_port_num: 1,
@@ -403,7 +404,6 @@ class Mon
this.ws_alive = false;
this.ws_keepalive_timer = null;
this.on_stop_cb = () => this.on_stop(0).catch(console.error);
this.recheck_pgs_active = false;
}
parse_etcd_addresses(addrs)
@@ -693,27 +693,8 @@ class Mon
});
}
// Schedule save_last_clean() to to run after a small timeout (1s) (to not spam etcd)
schedule_save_last_clean()
{
if (!this.save_last_clean_timer)
{
this.save_last_clean_timer = setTimeout(() =>
{
this.save_last_clean_timer = null;
this.save_last_clean().catch(this.die);
}, this.config.mon_change_timeout || 1000);
}
}
async save_last_clean()
{
if (this.save_last_clean_running)
{
this.schedule_save_last_clean();
return;
}
this.save_last_clean_running = true;
// last_clean_pgs is used to avoid extra data move when observing a series of changes in the cluster
const new_clean_pgs = { items: {} };
next_pool:
@@ -750,7 +731,6 @@ class Mon
value: b64(JSON.stringify(this.state.history.last_clean_pgs))
} } ],
}, this.etcd_start_timeout, 0);
this.save_last_clean_running = false;
}
get_mon_state()
@@ -1224,12 +1204,6 @@ class Mon
async recheck_pgs()
{
if (this.recheck_pgs_active)
{
this.schedule_recheck();
return;
}
this.recheck_pgs_active = true;
// Take configuration and state, check it against the stored configuration hash
// Recalculate PGs and save them to etcd if the configuration is changed
// FIXME: Do not change anything if the distribution is good and random enough and no PGs are degraded
@@ -1251,7 +1225,6 @@ class Mon
// Pool deleted. Delete all PGs, but first stop them.
if (!await this.stop_all_pgs(pool_id))
{
this.recheck_pgs_active = false;
this.schedule_recheck();
return;
}
@@ -1320,16 +1293,9 @@ class Mon
// PG count changed. Need to bring all PGs down.
if (!await this.stop_all_pgs(pool_id))
{
this.recheck_pgs_active = false;
this.schedule_recheck();
return;
}
}
if (prev_pgs.length != pool_cfg.pg_count)
{
// Scale PG count
// Do it even if old_pg_count is already equal to pool_cfg.pg_count,
// because last_clean_pgs may still contain the old number of PGs
const new_pg_history = [];
PGUtil.scale_pg_count(prev_pgs, real_prev_pgs, pg_history, new_pg_history, pool_cfg.pg_count);
pg_history = new_pg_history;
@@ -1431,7 +1397,6 @@ class Mon
await this.save_pg_config(new_config_pgs);
}
}
this.recheck_pgs_active = false;
}
async save_pg_config(new_config_pgs, etcd_request = { compare: [], success: [] })
@@ -1481,6 +1446,7 @@ class Mon
}
// Schedule a recheck to run after a small timeout (1s)
// If already scheduled, cancel previous timer and schedule it again
// This is required for multiple change events to trigger at most 1 recheck in 1s
schedule_recheck()
{
@@ -1498,7 +1464,7 @@ class Mon
{
const zero_stats = { op: { bps: 0n, iops: 0n, lat: 0n }, subop: { iops: 0n, lat: 0n }, recovery: { bps: 0n, iops: 0n } };
const diff = { op_stats: {}, subop_stats: {}, recovery_stats: {}, inode_stats: {} };
if (!st || !st.time || !prev || !prev.time || prev.time >= st.time)
if (!st || !st.time || !prev || prev.time >= st.time)
{
return prev_diff || diff;
}

View File

@@ -1,6 +1,6 @@
{
"name": "vitastor-mon",
"version": "1.2.0",
"version": "1.1.0",
"description": "Vitastor SDS monitor service",
"main": "mon-main.js",
"scripts": {

View File

@@ -50,7 +50,7 @@ from cinder.volume import configuration
from cinder.volume import driver
from cinder.volume import volume_utils
VERSION = '1.2.0'
VERSION = '1.1.0'
LOG = logging.getLogger(__name__)

View File

@@ -24,4 +24,4 @@ rm fio
mv fio-copy fio
FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
tar --transform 's#^#vitastor-1.2.0/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-1.2.0$(rpm --eval '%dist').tar.gz *
tar --transform 's#^#vitastor-1.1.0/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-1.1.0$(rpm --eval '%dist').tar.gz *

View File

@@ -35,7 +35,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-1.2.0.el7.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-1.1.0.el7.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 1.2.0
Version: 1.1.0
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-1.2.0.el7.tar.gz
Source0: vitastor-1.1.0.el7.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel

View File

@@ -35,7 +35,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-1.2.0.el8.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-1.1.0.el8.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 1.2.0
Version: 1.1.0
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-1.2.0.el8.tar.gz
Source0: vitastor-1.1.0.el8.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel

View File

@@ -18,7 +18,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-1.2.0.el9.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-1.1.0.el9.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el9.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 1.2.0
Version: 1.1.0
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-1.2.0.el9.tar.gz
Source0: vitastor-1.1.0.el9.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel

View File

@@ -16,11 +16,10 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}")
endif()
add_definitions(-DVERSION="1.2.0")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -fno-omit-frame-pointer -I ${CMAKE_SOURCE_DIR}/src)
add_link_options(-fno-omit-frame-pointer)
add_definitions(-DVERSION="1.1.0")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -I ${CMAKE_SOURCE_DIR}/src)
if (${WITH_ASAN})
add_definitions(-fsanitize=address)
add_definitions(-fsanitize=address -fno-omit-frame-pointer)
add_link_options(-fsanitize=address -fno-omit-frame-pointer)
endif (${WITH_ASAN})
@@ -181,25 +180,6 @@ target_link_libraries(vitastor-nbd
vitastor_client
)
# vitastor-kv
add_executable(vitastor-kv
kv_cli.cpp
kv_db.cpp
kv_db.h
)
target_link_libraries(vitastor-kv
vitastor_client
)
add_executable(vitastor-kv-stress
kv_stress.cpp
kv_db.cpp
kv_db.h
)
target_link_libraries(vitastor-kv-stress
vitastor_client
)
# vitastor-nfs
add_executable(vitastor-nfs
nfs_proxy.cpp

View File

@@ -1372,8 +1372,7 @@ bool journal_flusher_co::trim_journal(int wait_base)
? (uint32_t)JE_START_V1_SIZE : (uint32_t)JE_START_V2_SIZE),
.reserved = 0,
.journal_start = new_trim_pos,
.version = (uint64_t)(!bs->dsk.data_csum_type && ((journal_entry_start*)flusher->journal_superblock)->version == JOURNAL_VERSION_V1
? JOURNAL_VERSION_V1 : JOURNAL_VERSION_V2),
.version = JOURNAL_VERSION_V2,
.data_csum_type = bs->dsk.data_csum_type,
.csum_block_size = bs->dsk.csum_block_size,
};

View File

@@ -274,7 +274,7 @@ class blockstore_impl_t
blockstore_dirty_db_t dirty_db;
std::vector<blockstore_op_t*> submit_queue;
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
int unsynced_big_write_count = 0, unstable_unsynced = 0;
int unsynced_big_write_count = 0;
int unsynced_queued_ops = 0;
allocator *data_alloc = NULL;
uint8_t *zero_object;

View File

@@ -553,7 +553,7 @@ resume_1:
}
if (je_start->size == JE_START_V0_SIZE ||
(je_start->version != JOURNAL_VERSION_V1 || je_start->size != JE_START_V1_SIZE) &&
(je_start->version != JOURNAL_VERSION_V2 || je_start->size != JE_START_V2_SIZE && je_start->size != JE_START_V1_SIZE))
(je_start->version != JOURNAL_VERSION_V2 || je_start->size != JE_START_V2_SIZE))
{
fprintf(
stderr, "The code only supports journal versions 2 and 1, but it is %lu on disk."
@@ -562,8 +562,7 @@ resume_1:
);
exit(1);
}
if (je_start->version == JOURNAL_VERSION_V1 ||
je_start->version == JOURNAL_VERSION_V2 && je_start->size == JE_START_V1_SIZE)
if (je_start->version == JOURNAL_VERSION_V1)
{
je_start->data_csum_type = 0;
je_start->csum_block_size = 0;

View File

@@ -145,7 +145,6 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
journal.sector_info[journal.cur_sector].offset = journal.next_free;
journal.in_sector_pos = 0;
journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size;
assert(journal.next_free != journal.used_start);
memset(journal.inmemory
? (uint8_t*)journal.buffer + journal.sector_info[journal.cur_sector].offset
: (uint8_t*)journal.sector_buf + journal.block_size*journal.cur_sector, 0, journal.block_size);

View File

@@ -13,6 +13,12 @@
#define JOURNAL_BUFFER_SIZE 4*1024*1024
#define JOURNAL_ENTRY_HEADER_SIZE 16
// We reserve some extra space for future stabilize requests during writes
// FIXME: This value should be dynamic i.e. Blockstore ideally shouldn't allow
// writing more than can be stabilized afterwards
#define JOURNAL_STABILIZE_RESERVATION 65536
#define JOURNAL_INSTANT_RESERVATION 131072
// Journal entries
// Journal entries are linked to each other by their crc32 value
// The journal is almost a blockchain, because object versions constantly increase

View File

@@ -86,15 +86,14 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
auto & dirty_entry = dirty_db.at(sbw);
uint64_t dyn_size = dsk.dirty_dyn_size(dirty_entry.offset, dirty_entry.len);
if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size,
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
left == 0 ? JOURNAL_STABILIZE_RESERVATION : 0))
{
return 0;
}
}
}
else if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(),
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size,
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
sizeof(journal_entry_big_write) + dsk.clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION))
{
return 0;
}
@@ -185,11 +184,6 @@ void blockstore_impl_t::ack_sync(blockstore_op_t *op)
{
mark_stable(dirty_it->first);
}
else
{
unstable_unsynced--;
assert(unstable_unsynced >= 0);
}
dirty_it++;
while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid)
{
@@ -220,11 +214,6 @@ void blockstore_impl_t::ack_sync(blockstore_op_t *op)
{
mark_stable(*it);
}
else
{
unstable_unsynced--;
assert(unstable_unsynced >= 0);
}
}
}
op->retval = 0;

View File

@@ -21,7 +21,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
dyn = calloc_or_die(1, dyn_size+sizeof(int));
*((int*)dyn) = 1;
}
uint8_t *dyn_ptr = (alloc_dyn_data ? (uint8_t*)dyn+sizeof(int) : (uint8_t*)&dyn);
uint8_t *dyn_ptr = (uint8_t*)(alloc_dyn_data ? dyn+sizeof(int) : &dyn);
uint64_t version = 1;
if (dirty_db.size() > 0)
{
@@ -320,7 +320,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, unsynced_big_write_count + 1,
sizeof(journal_entry_big_write) + dsk.clean_dyn_size,
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
(dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION))
{
return 0;
}
@@ -386,10 +386,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
sqe, dsk.data_fd, PRIV(op)->iov_zerofill, vcnt, dsk.data_offset + (loc << dsk.block_order) + op->offset - stripe_offset
);
PRIV(op)->pending_ops = 1;
if (immediate_commit != IMMEDIATE_ALL && !(dirty_it->second.state & BS_ST_INSTANT))
{
unstable_unsynced++;
}
if (immediate_commit != IMMEDIATE_ALL)
{
// Increase the counter, but don't save into unsynced_writes yet (can't sync until the write is finished)
@@ -412,7 +408,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
sizeof(journal_entry_big_write) + dsk.clean_dyn_size, 0)
|| !space_check.check_available(op, 1,
sizeof(journal_entry_small_write) + dyn_size,
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
op->len + ((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
{
return 0;
}
@@ -503,11 +499,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
if (journal.next_free >= journal.len)
{
journal.next_free = dsk.journal_block_size;
assert(journal.next_free != journal.used_start);
}
if (immediate_commit == IMMEDIATE_NONE && !(dirty_it->second.state & BS_ST_INSTANT))
{
unstable_unsynced++;
}
if (!PRIV(op)->pending_ops)
{
@@ -547,7 +538,7 @@ resume_2:
uint64_t dyn_size = dsk.dirty_dyn_size(op->offset, op->len);
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, 1, sizeof(journal_entry_big_write) + dyn_size,
(unstable_writes.size()+unstable_unsynced)*journal.block_size))
((dirty_it->second.state & BS_ST_INSTANT) ? JOURNAL_INSTANT_RESERVATION : JOURNAL_STABILIZE_RESERVATION)))
{
return 0;
}
@@ -591,20 +582,14 @@ resume_4:
#endif
bool is_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE;
bool imm = is_big ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE);
bool is_instant = ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT));
if (imm)
{
auto & unstab = unstable_writes[op->oid];
unstab = unstab < op->version ? op->version : unstab;
}
else if (!is_instant)
{
unstable_unsynced--;
assert(unstable_unsynced >= 0);
}
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK)
| (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
if (imm && is_instant)
if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)))
{
// Deletions and 'instant' operations are treated as immediately stable
mark_stable(dirty_it->first);
@@ -750,7 +735,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
});
assert(dirty_it != dirty_db.end());
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), (unstable_writes.size()+unstable_unsynced)*journal.block_size))
if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_INSTANT_RESERVATION))
{
return 0;
}

View File

@@ -17,7 +17,7 @@
static const char *exe_name = NULL;
static const char* help_text =
"Vitastor command-line tool " VERSION "\n"
"Vitastor command-line tool\n"
"(c) Vitaliy Filippov, 2019+ (VNPL-1.1)\n"
"\n"
"COMMANDS:\n"
@@ -331,7 +331,7 @@ static int run(cli_tool_t *p, json11::Json::object cfg)
{
// Create client
json11::Json cfg_j = cfg;
p->ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
p->ringloop = new ring_loop_t(512);
p->epmgr = new epoll_manager_t(p->ringloop);
p->cli = new cluster_client_t(p->ringloop, p->epmgr->tfd, cfg_j);
// Smaller timeout by default for more interactiveness

View File

@@ -109,7 +109,7 @@ resume_1:
}
for (auto pg_per_pair: pg_per_osd)
{
uint64_t pg_free = osd_free[pg_per_pair.first] * pool_cfg.real_pg_count / pg_per_pair.second;
uint64_t pg_free = osd_free[pg_per_pair.first] * pool_cfg.pg_count / pg_per_pair.second;
if (pool_avail > pg_free)
{
pool_avail = pg_free;
@@ -127,7 +127,6 @@ resume_1:
{ "id", (uint64_t)pool_cfg.id },
{ "name", pool_cfg.name },
{ "pg_count", pool_cfg.pg_count },
{ "real_pg_count", pool_cfg.real_pg_count },
{ "scheme", pool_cfg.scheme == POOL_SCHEME_REPLICATED ? "replicated" : "ec" },
{ "scheme_name", pool_cfg.scheme == POOL_SCHEME_REPLICATED
? std::to_string(pool_cfg.pg_size)+"/"+std::to_string(pool_cfg.pg_minsize)
@@ -178,7 +177,7 @@ resume_1:
{ "title", "SCHEME" },
});
cols.push_back(json11::Json::object{
{ "key", "pg_count_fmt" },
{ "key", "pg_count" },
{ "title", "PGS" },
});
cols.push_back(json11::Json::object{
@@ -207,9 +206,6 @@ resume_1:
double raw_to = kv.second["raw_to_usable"].number_value();
if (raw_to < 0.000001 && raw_to > -0.000001)
raw_to = 1;
kv.second["pg_count_fmt"] = kv.second["real_pg_count"] == kv.second["pg_count"]
? kv.second["real_pg_count"].as_string()
: kv.second["real_pg_count"].as_string()+"->"+kv.second["pg_count"].as_string();
kv.second["total_fmt"] = format_size(kv.second["total_raw"].uint64_value() / raw_to);
kv.second["used_fmt"] = format_size(kv.second["used_raw"].uint64_value() / raw_to);
kv.second["max_avail_fmt"] = format_size(kv.second["max_available"].uint64_value());

View File

@@ -158,7 +158,12 @@ resume_2:
for (auto & pool_pair: parent->cli->st_cli.pool_config)
{
auto & pool_cfg = pool_pair.second;
bool active = pool_cfg.real_pg_count > 0;
bool active = true;
if (pool_cfg.pg_config.size() != pool_cfg.pg_count)
{
active = false;
pgs_by_state["offline"] += pool_cfg.pg_count-pool_cfg.pg_config.size();
}
pool_count++;
for (auto pg_it = pool_cfg.pg_config.begin(); pg_it != pool_cfg.pg_config.end(); pg_it++)
{

View File

@@ -6,7 +6,7 @@
#include "cluster_client_impl.h"
#include "http_client.h" // json_is_true
cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json config)
cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
{
wb = new writeback_cache_t();
@@ -64,7 +64,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
cluster_client_t::~cluster_client_t()
{
msgr.repeer_pgs = [](osd_num_t){};
msgr.repeer_pgs = [this](osd_num_t){};
if (ringloop)
{
ringloop->unregister_consumer(&consumer);
@@ -454,7 +454,7 @@ bool cluster_client_t::flush()
wb->start_writebacks(this, 0);
cluster_op_t *sync = new cluster_op_t;
sync->opcode = OSD_OP_SYNC;
sync->callback = [](cluster_op_t *sync)
sync->callback = [this](cluster_op_t *sync)
{
delete sync;
};
@@ -465,7 +465,7 @@ bool cluster_client_t::flush()
bool sync_done = false;
cluster_op_t *sync = new cluster_op_t;
sync->opcode = OSD_OP_SYNC;
sync->callback = [&sync_done](cluster_op_t *sync)
sync->callback = [this, &sync_done](cluster_op_t *sync)
{
delete sync;
sync_done = true;
@@ -532,7 +532,7 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
return;
}
if (op->opcode == OSD_OP_WRITE && enable_writeback && !(op->flags & OP_FLUSH_BUFFER) &&
!op->version /* no CAS writeback */)
!op->version /* FIXME no CAS writeback */)
{
if (wb->writebacks_active >= client_max_writeback_iodepth)
{
@@ -553,7 +553,7 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
}
if (op->opcode == OSD_OP_WRITE && !(op->flags & OP_IMMEDIATE_COMMIT))
{
if (!(op->flags & OP_FLUSH_BUFFER) && !op->version /* no CAS write-repeat */)
if (!(op->flags & OP_FLUSH_BUFFER))
{
wb->copy_write(op, CACHE_WRITTEN);
}
@@ -1152,7 +1152,7 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected
);
}
else if (log_level > 0)
else
{
fprintf(
stderr, "%s operation failed on OSD %lu: retval=%ld (expected %d)\n",

View File

@@ -121,7 +121,7 @@ public:
json11::Json::object cli_config, file_config, etcd_global_config;
json11::Json::object config;
cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json config);
cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config);
~cluster_client_t();
void execute(cluster_op_t *op);
void execute_raw(osd_num_t osd_num, osd_op_t *op);

View File

@@ -263,7 +263,7 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
}
assert(calc_len == op->len);
writebacks_active++;
op->callback = [this, flush_id](cluster_op_t* op)
op->callback = [this, cli, flush_id](cluster_op_t* op)
{
// Buffer flushes should be always retried, regardless of the error,
// so they should never result in an error here
@@ -383,7 +383,7 @@ static void copy_to_op(cluster_op_t *op, uint64_t offset, uint8_t *buf, uint64_t
auto begin = (cur_offset < offset ? offset : cur_offset);
auto end = (cur_offset+v.iov_len > offset+len ? offset+len : cur_offset+v.iov_len);
memcpy(
(uint8_t*)v.iov_base + begin - cur_offset,
v.iov_base + begin - cur_offset,
buf + (cur_offset <= offset ? 0 : cur_offset-offset),
end - begin
);

View File

@@ -5,7 +5,7 @@
#include "str_util.h"
static const char *help_text =
"Vitastor disk management tool " VERSION "\n"
"Vitastor disk management tool\n"
"(c) Vitaliy Filippov, 2022+ (VNPL-1.1)\n"
"\n"
"COMMANDS:\n"
@@ -229,7 +229,7 @@ int main(int argc, char *argv[])
{
self.options["allow_data_loss"] = "1";
}
else if (argv[i][0] == '-' && argv[i][1] == '-' && i < argc-1)
else if (argv[i][0] == '-' && argv[i][1] == '-')
{
char *key = argv[i]+2;
self.options[key] = argv[++i];

View File

@@ -320,7 +320,7 @@ void disk_tool_t::dump_journal_entry(int num, journal_entry *je, bool json)
if (journal_calc_data_pos != sw.data_offset)
{
printf(json ? ",\"bad_loc\":true,\"calc_loc\":\"0x%lx\""
: " (mismatched, calculated = %08lx)", journal_pos);
: " (mismatched, calculated = %lu)", journal_pos);
}
uint32_t data_csum_size = (!je_start.csum_block_size
? 0

View File

@@ -245,7 +245,7 @@ int disk_tool_t::resize_copy_data()
{
iodepth = 32;
}
ringloop = new ring_loop_t(iodepth < RINGLOOP_DEFAULT_SIZE ? RINGLOOP_DEFAULT_SIZE : iodepth);
ringloop = new ring_loop_t(iodepth < 512 ? 512 : iodepth);
dsk.data_fd = open(dsk.data_device.c_str(), O_DIRECT|O_RDWR);
if (dsk.data_fd < 0)
{

View File

@@ -130,7 +130,7 @@ static int bs_init(struct thread_data *td)
config[p.first] = p.second.dump();
}
}
bsd->ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
bsd->ringloop = new ring_loop_t(512);
bsd->epmgr = new epoll_manager_t(bsd->ringloop);
bsd->bs = new blockstore_t(config, bsd->ringloop, bsd->epmgr->tfd);
while (1)

View File

@@ -1,401 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Vitastor shared key/value database test CLI
#define _XOPEN_SOURCE
#include <limits.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
//#include <signal.h>
#include "epoll_manager.h"
#include "str_util.h"
#include "kv_db.h"
const char *exe_name = NULL;
class kv_cli_t
{
public:
kv_dbw_t *db = NULL;
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
cluster_client_t *cli = NULL;
bool interactive = false;
int in_progress = 0;
char *cur_cmd = NULL;
int cur_cmd_size = 0, cur_cmd_alloc = 0;
bool finished = false, eof = false;
json11::Json::object cfg;
~kv_cli_t();
static json11::Json::object parse_args(int narg, const char *args[]);
void run(const json11::Json::object & cfg);
void read_cmd();
void next_cmd();
void handle_cmd(const std::string & cmd, std::function<void()> cb);
};
kv_cli_t::~kv_cli_t()
{
if (cur_cmd)
{
free(cur_cmd);
cur_cmd = NULL;
}
cur_cmd_alloc = 0;
if (db)
delete db;
if (cli)
{
cli->flush();
delete cli;
}
if (epmgr)
delete epmgr;
if (ringloop)
delete ringloop;
}
json11::Json::object kv_cli_t::parse_args(int narg, const char *args[])
{
json11::Json::object cfg;
for (int i = 1; i < narg; i++)
{
if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
{
printf(
"Vitastor Key/Value CLI\n"
"(c) Vitaliy Filippov, 2023+ (VNPL-1.1)\n"
"\n"
"USAGE: %s [--etcd_address ADDR] [OTHER OPTIONS]\n",
exe_name
);
exit(0);
}
else if (args[i][0] == '-' && args[i][1] == '-')
{
const char *opt = args[i]+2;
cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i];
}
}
return cfg;
}
void kv_cli_t::run(const json11::Json::object & cfg)
{
// Create client
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
db = new kv_dbw_t(cli);
// Load image metadata
while (!cli->is_ready())
{
ringloop->loop();
if (cli->is_ready())
break;
ringloop->wait();
}
// Run
fcntl(0, F_SETFL, fcntl(0, F_GETFL, 0) | O_NONBLOCK);
try
{
epmgr->tfd->set_fd_handler(0, false, [this](int fd, int events)
{
if (events & EPOLLIN)
{
read_cmd();
}
if (events & EPOLLRDHUP)
{
epmgr->tfd->set_fd_handler(0, false, NULL);
finished = true;
}
});
interactive = true;
printf("> ");
}
catch (std::exception & e)
{
// Can't add to epoll, STDIN is probably a file
read_cmd();
}
while (!finished)
{
ringloop->loop();
if (!finished)
ringloop->wait();
}
// Destroy the client
delete db;
db = NULL;
cli->flush();
delete cli;
delete epmgr;
delete ringloop;
cli = NULL;
epmgr = NULL;
ringloop = NULL;
}
void kv_cli_t::read_cmd()
{
if (!cur_cmd_alloc)
{
cur_cmd_alloc = 65536;
cur_cmd = (char*)malloc_or_die(cur_cmd_alloc);
}
while (cur_cmd_size < cur_cmd_alloc)
{
int r = read(0, cur_cmd+cur_cmd_size, cur_cmd_alloc-cur_cmd_size);
if (r < 0 && errno != EAGAIN)
fprintf(stderr, "Error reading from stdin: %s\n", strerror(errno));
if (r > 0)
cur_cmd_size += r;
if (r == 0)
eof = true;
if (r <= 0)
break;
}
next_cmd();
}
void kv_cli_t::next_cmd()
{
if (in_progress > 0)
{
return;
}
int pos = 0;
for (; pos < cur_cmd_size; pos++)
{
if (cur_cmd[pos] == '\n' || cur_cmd[pos] == '\r')
{
auto cmd = trim(std::string(cur_cmd, pos));
pos++;
memmove(cur_cmd, cur_cmd+pos, cur_cmd_size-pos);
cur_cmd_size -= pos;
in_progress++;
handle_cmd(cmd, [this]()
{
in_progress--;
if (interactive)
printf("> ");
next_cmd();
if (!in_progress)
read_cmd();
});
break;
}
}
if (eof && !in_progress)
{
finished = true;
}
}
void kv_cli_t::handle_cmd(const std::string & cmd, std::function<void()> cb)
{
if (cmd == "")
{
cb();
return;
}
auto pos = cmd.find_first_of(" \t");
if (pos != std::string::npos)
{
while (pos < cmd.size()-1 && (cmd[pos+1] == ' ' || cmd[pos+1] == '\t'))
pos++;
}
auto opname = strtolower(pos == std::string::npos ? cmd : cmd.substr(0, pos));
if (opname == "open")
{
uint64_t pool_id = 0;
inode_t inode_id = 0;
uint32_t kv_block_size = 0;
int scanned = sscanf(cmd.c_str() + pos+1, "%lu %lu %u", &pool_id, &inode_id, &kv_block_size);
if (scanned == 2)
{
kv_block_size = 4096;
}
if (scanned < 2 || !pool_id || !inode_id || !kv_block_size || (kv_block_size & (kv_block_size-1)) != 0)
{
fprintf(stderr, "Usage: open <pool_id> <inode_id> [block_size]. Block size must be a power of 2. Default is 4096.\n");
cb();
return;
}
cfg["kv_block_size"] = (uint64_t)kv_block_size;
db->open(INODE_WITH_POOL(pool_id, inode_id), cfg, [=](int res)
{
if (res < 0)
fprintf(stderr, "Error opening index: %s (code %d)\n", strerror(-res), res);
else
printf("Index opened. Current size: %lu bytes\n", db->get_size());
cb();
});
}
else if (opname == "config")
{
auto pos2 = cmd.find_first_of(" \t", pos+1);
if (pos2 == std::string::npos)
{
fprintf(stderr, "Usage: config <property> <value>\n");
cb();
return;
}
auto key = trim(cmd.substr(pos+1, pos2-pos-1));
auto value = parse_size(trim(cmd.substr(pos2+1)));
if (key != "kv_memory_limit" &&
key != "kv_allocate_blocks" &&
key != "kv_evict_max_misses" &&
key != "kv_evict_attempts_per_level" &&
key != "kv_evict_unused_age" &&
key != "kv_log_level")
{
fprintf(
stderr, "Allowed properties: kv_memory_limit, kv_allocate_blocks,"
" kv_evict_max_misses, kv_evict_attempts_per_level, kv_evict_unused_age, kv_log_level\n"
);
}
else
{
cfg[key] = value;
db->set_config(cfg);
}
cb();
}
else if (opname == "get" || opname == "set" || opname == "del")
{
if (opname == "get" || opname == "del")
{
if (pos == std::string::npos)
{
fprintf(stderr, "Usage: %s <key>\n", opname.c_str());
cb();
return;
}
auto key = trim(cmd.substr(pos+1));
if (opname == "get")
{
db->get(key, [this, cb](int res, const std::string & value)
{
if (res < 0)
fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res);
else
{
write(1, value.c_str(), value.size());
write(1, "\n", 1);
}
cb();
});
}
else
{
db->del(key, [this, cb](int res)
{
if (res < 0)
fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res);
else
printf("OK\n");
cb();
});
}
}
else
{
auto pos2 = cmd.find_first_of(" \t", pos+1);
if (pos2 == std::string::npos)
{
fprintf(stderr, "Usage: set <key> <value>\n");
cb();
return;
}
auto key = trim(cmd.substr(pos+1, pos2-pos-1));
auto value = trim(cmd.substr(pos2+1));
db->set(key, value, [this, cb](int res)
{
if (res < 0)
fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res);
else
printf("OK\n");
cb();
});
}
}
else if (opname == "list")
{
std::string start, end;
if (pos != std::string::npos)
{
auto pos2 = cmd.find_first_of(" \t", pos+1);
if (pos2 != std::string::npos)
{
start = trim(cmd.substr(pos+1, pos2-pos-1));
end = trim(cmd.substr(pos2+1));
}
else
{
start = trim(cmd.substr(pos+1));
}
}
void *handle = db->list_start(start);
db->list_next(handle, [=](int res, const std::string & key, const std::string & value)
{
if (res < 0)
{
if (res != -ENOENT)
{
fprintf(stderr, "Error: %s (code %d)\n", strerror(-res), res);
}
db->list_close(handle);
cb();
}
else
{
printf("%s = %s\n", key.c_str(), value.c_str());
db->list_next(handle, NULL);
}
});
}
else if (opname == "close")
{
db->close([=]()
{
printf("Index closed\n");
cb();
});
}
else if (opname == "quit" || opname == "q")
{
::close(0);
finished = true;
}
else
{
fprintf(
stderr, "Unknown operation: %s. Supported operations:\n"
"open <pool_id> <inode_id> [block_size]\n"
"config <property> <value>\n"
"get <key>\nset <key> <value>\ndel <key>\nlist [<start> [end]]\n"
"close\nquit\n", opname.c_str()
);
cb();
}
}
int main(int narg, const char *args[])
{
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
exe_name = args[0];
kv_cli_t *p = new kv_cli_t();
p->run(kv_cli_t::parse_args(narg, args));
delete p;
return 0;
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,36 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Vitastor shared key/value database
// Parallel optimistic B-Tree O:-)
#pragma once
#include "cluster_client.h"
struct kv_db_t;
struct kv_dbw_t
{
kv_dbw_t(cluster_client_t *cli);
~kv_dbw_t();
void open(inode_t inode_id, json11::Json cfg, std::function<void(int)> cb);
void set_config(json11::Json cfg);
void close(std::function<void()> cb);
uint64_t get_size();
void get(const std::string & key, std::function<void(int res, const std::string & value)> cb,
bool allow_old_cached = false);
void set(const std::string & key, const std::string & value, std::function<void(int res)> cb,
std::function<bool(int res, const std::string & value)> cas_compare = NULL);
void del(const std::string & key, std::function<void(int res)> cb,
std::function<bool(int res, const std::string & value)> cas_compare = NULL);
void* list_start(const std::string & start);
void list_next(void *handle, std::function<void(int res, const std::string & key, const std::string & value)> cb);
void list_close(void *handle);
kv_db_t *db;
};

View File

@@ -1,697 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Vitastor shared key/value database stress tester / benchmark
#define _XOPEN_SOURCE
#include <limits.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
//#include <signal.h>
#include "epoll_manager.h"
#include "str_util.h"
#include "kv_db.h"
const char *exe_name = NULL;
struct kv_test_listing_t
{
uint64_t count = 0, done = 0;
void *handle = NULL;
std::string next_after;
std::set<std::string> inflights;
timespec tv_begin;
bool error = false;
};
struct kv_test_lat_t
{
const char *name = NULL;
uint64_t usec = 0, count = 0;
};
struct kv_test_stat_t
{
kv_test_lat_t get, add, update, del, list;
uint64_t list_keys = 0;
};
class kv_test_t
{
public:
// Config
json11::Json::object kv_cfg;
std::string key_prefix, key_suffix;
uint64_t inode_id = 0;
uint64_t op_count = 1000000;
uint64_t runtime_sec = 0;
uint64_t parallelism = 4;
uint64_t reopen_prob = 1;
uint64_t get_prob = 30000;
uint64_t add_prob = 20000;
uint64_t update_prob = 20000;
uint64_t del_prob = 5000;
uint64_t list_prob = 300;
uint64_t min_key_len = 10;
uint64_t max_key_len = 70;
uint64_t min_value_len = 50;
uint64_t max_value_len = 300;
uint64_t min_list_count = 10;
uint64_t max_list_count = 1000;
uint64_t print_stats_interval = 1;
bool json_output = false;
uint64_t log_level = 1;
bool trace = false;
bool stop_on_error = false;
// FIXME: Multiple clients
kv_test_stat_t stat, prev_stat;
timespec prev_stat_time, start_stat_time;
// State
kv_dbw_t *db = NULL;
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
cluster_client_t *cli = NULL;
ring_consumer_t consumer;
bool finished = false;
uint64_t total_prob = 0;
uint64_t ops_sent = 0, ops_done = 0;
int stat_timer_id = -1;
int in_progress = 0;
bool reopening = false;
std::set<kv_test_listing_t*> listings;
std::set<std::string> changing_keys;
std::map<std::string, std::string> values;
~kv_test_t();
static json11::Json::object parse_args(int narg, const char *args[]);
void parse_config(json11::Json cfg);
void run(json11::Json cfg);
void loop();
void print_stats(kv_test_stat_t & prev_stat, timespec & prev_stat_time);
void print_total_stats();
void start_change(const std::string & key);
void stop_change(const std::string & key);
void add_stat(kv_test_lat_t & stat, timespec tv_begin);
};
kv_test_t::~kv_test_t()
{
if (db)
delete db;
if (cli)
{
cli->flush();
delete cli;
}
if (epmgr)
delete epmgr;
if (ringloop)
delete ringloop;
}
json11::Json::object kv_test_t::parse_args(int narg, const char *args[])
{
json11::Json::object cfg;
for (int i = 1; i < narg; i++)
{
if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help"))
{
printf(
"Vitastor Key/Value DB stress tester / benchmark\n"
"(c) Vitaliy Filippov, 2023+ (VNPL-1.1)\n"
"\n"
"USAGE: %s --pool_id POOL_ID --inode_id INODE_ID [OPTIONS]\n"
" --op_count 1000000\n"
" Total operations to run during test. 0 means unlimited\n"
" --key_prefix \"\"\n"
" Prefix for all keys read or written (to avoid collisions)\n"
" --key_suffix \"\"\n"
" Suffix for all keys read or written (to avoid collisions, but scan all DB)\n"
" --runtime 0\n"
" Run for this number of seconds. 0 means unlimited\n"
" --parallelism 4\n"
" Run this number of operations in parallel\n"
" --get_prob 30000\n"
" Fraction of key retrieve operations\n"
" --add_prob 20000\n"
" Fraction of key addition operations\n"
" --update_prob 20000\n"
" Fraction of key update operations\n"
" --del_prob 30000\n"
" Fraction of key delete operations\n"
" --list_prob 300\n"
" Fraction of listing operations\n"
" --min_key_len 10\n"
" Minimum key size in bytes\n"
" --max_key_len 70\n"
" Maximum key size in bytes\n"
" --min_value_len 50\n"
" Minimum value size in bytes\n"
" --max_value_len 300\n"
" Maximum value size in bytes\n"
" --min_list_count 10\n"
" Minimum number of keys read in listing (0 = all keys)\n"
" --max_list_count 1000\n"
" Maximum number of keys read in listing\n"
" --print_stats 1\n"
" Print operation statistics every this number of seconds\n"
" --json\n"
" JSON output\n"
" --stop_on_error 0\n"
" Stop on first execution error, mismatch, lost key or extra key during listing\n"
" --kv_memory_limit 128M\n"
" Maximum memory to use for vitastor-kv index cache\n"
" --kv_allocate_blocks 4\n"
" Number of PG blocks used for new tree block allocation in parallel\n"
" --kv_evict_max_misses 10\n"
" Eviction algorithm parameter: retry eviction from another random spot\n"
" if this number of keys is used currently or was used recently\n"
" --kv_evict_attempts_per_level 3\n"
" Retry eviction at most this number of times per tree level, starting\n"
" with bottom-most levels\n"
" --kv_evict_unused_age 1000\n"
" Evict only keys unused during this number of last operations\n"
" --kv_log_level 1\n"
" Log level. 0 = errors, 1 = warnings, 10 = trace operations\n",
exe_name
);
exit(0);
}
else if (args[i][0] == '-' && args[i][1] == '-')
{
const char *opt = args[i]+2;
cfg[opt] = !strcmp(opt, "json") || i == narg-1 ? "1" : args[++i];
}
}
return cfg;
}
void kv_test_t::parse_config(json11::Json cfg)
{
inode_id = INODE_WITH_POOL(cfg["pool_id"].uint64_value(), cfg["inode_id"].uint64_value());
if (cfg["op_count"].uint64_value() > 0)
op_count = cfg["op_count"].uint64_value();
key_prefix = cfg["key_prefix"].string_value();
key_suffix = cfg["key_suffix"].string_value();
if (cfg["runtime"].uint64_value() > 0)
runtime_sec = cfg["runtime"].uint64_value();
if (cfg["parallelism"].uint64_value() > 0)
parallelism = cfg["parallelism"].uint64_value();
if (!cfg["reopen_prob"].is_null())
reopen_prob = cfg["reopen_prob"].uint64_value();
if (!cfg["get_prob"].is_null())
get_prob = cfg["get_prob"].uint64_value();
if (!cfg["add_prob"].is_null())
add_prob = cfg["add_prob"].uint64_value();
if (!cfg["update_prob"].is_null())
update_prob = cfg["update_prob"].uint64_value();
if (!cfg["del_prob"].is_null())
del_prob = cfg["del_prob"].uint64_value();
if (!cfg["list_prob"].is_null())
list_prob = cfg["list_prob"].uint64_value();
if (!cfg["min_key_len"].is_null())
min_key_len = cfg["min_key_len"].uint64_value();
if (cfg["max_key_len"].uint64_value() > 0)
max_key_len = cfg["max_key_len"].uint64_value();
if (!cfg["min_value_len"].is_null())
min_value_len = cfg["min_value_len"].uint64_value();
if (cfg["max_value_len"].uint64_value() > 0)
max_value_len = cfg["max_value_len"].uint64_value();
if (!cfg["min_list_count"].is_null())
min_list_count = cfg["min_list_count"].uint64_value();
if (!cfg["max_list_count"].is_null())
max_list_count = cfg["max_list_count"].uint64_value();
if (!cfg["print_stats"].is_null())
print_stats_interval = cfg["print_stats"].uint64_value();
if (!cfg["json"].is_null())
json_output = true;
if (!cfg["stop_on_error"].is_null())
stop_on_error = cfg["stop_on_error"].bool_value();
if (!cfg["kv_memory_limit"].is_null())
kv_cfg["kv_memory_limit"] = cfg["kv_memory_limit"];
if (!cfg["kv_allocate_blocks"].is_null())
kv_cfg["kv_allocate_blocks"] = cfg["kv_allocate_blocks"];
if (!cfg["kv_evict_max_misses"].is_null())
kv_cfg["kv_evict_max_misses"] = cfg["kv_evict_max_misses"];
if (!cfg["kv_evict_attempts_per_level"].is_null())
kv_cfg["kv_evict_attempts_per_level"] = cfg["kv_evict_attempts_per_level"];
if (!cfg["kv_evict_unused_age"].is_null())
kv_cfg["kv_evict_unused_age"] = cfg["kv_evict_unused_age"];
if (!cfg["kv_log_level"].is_null())
{
log_level = cfg["kv_log_level"].uint64_value();
trace = log_level >= 10;
kv_cfg["kv_log_level"] = cfg["kv_log_level"];
}
total_prob = reopen_prob+get_prob+add_prob+update_prob+del_prob+list_prob;
stat.get.name = "get";
stat.add.name = "add";
stat.update.name = "update";
stat.del.name = "del";
stat.list.name = "list";
}
void kv_test_t::run(json11::Json cfg)
{
srand48(time(NULL));
parse_config(cfg);
// Create client
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
db = new kv_dbw_t(cli);
// Load image metadata
while (!cli->is_ready())
{
ringloop->loop();
if (cli->is_ready())
break;
ringloop->wait();
}
// Run
reopening = true;
db->open(inode_id, kv_cfg, [this](int res)
{
reopening = false;
if (res < 0)
{
fprintf(stderr, "ERROR: Open index: %d (%s)\n", res, strerror(-res));
exit(1);
}
if (trace)
printf("Index opened\n");
ringloop->wakeup();
});
consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer);
if (print_stats_interval)
stat_timer_id = epmgr->tfd->set_timer(print_stats_interval*1000, true, [this](int) { print_stats(prev_stat, prev_stat_time); });
clock_gettime(CLOCK_REALTIME, &start_stat_time);
prev_stat_time = start_stat_time;
while (!finished)
{
ringloop->loop();
if (!finished)
ringloop->wait();
}
if (stat_timer_id >= 0)
epmgr->tfd->clear_timer(stat_timer_id);
ringloop->unregister_consumer(&consumer);
// Print total stats
print_total_stats();
// Destroy the client
delete db;
db = NULL;
cli->flush();
delete cli;
delete epmgr;
delete ringloop;
cli = NULL;
epmgr = NULL;
ringloop = NULL;
}
static const char *base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789@+/";
std::string random_str(int len)
{
std::string str;
str.resize(len);
for (int i = 0; i < len; i++)
{
str[i] = base64_chars[lrand48() % 64];
}
return str;
}
void kv_test_t::loop()
{
if (reopening)
{
return;
}
if (ops_done >= op_count)
{
finished = true;
}
while (!finished && ops_sent < op_count && in_progress < parallelism)
{
uint64_t dice = (lrand48() % total_prob);
if (dice < reopen_prob)
{
reopening = true;
db->close([this]()
{
if (trace)
printf("Index closed\n");
db->open(inode_id, kv_cfg, [this](int res)
{
reopening = false;
if (res < 0)
{
fprintf(stderr, "ERROR: Reopen index: %d (%s)\n", res, strerror(-res));
finished = true;
return;
}
if (trace)
printf("Index reopened\n");
ringloop->wakeup();
});
});
return;
}
else if (dice < reopen_prob+get_prob)
{
// get existing
auto key = random_str(max_key_len);
auto k_it = values.lower_bound(key);
if (k_it == values.end())
continue;
key = k_it->first;
if (changing_keys.find(key) != changing_keys.end())
continue;
in_progress++;
ops_sent++;
if (trace)
printf("get %s\n", key.c_str());
timespec tv_begin;
clock_gettime(CLOCK_REALTIME, &tv_begin);
db->get(key, [this, key, tv_begin](int res, const std::string & value)
{
add_stat(stat.get, tv_begin);
ops_done++;
in_progress--;
auto it = values.find(key);
if (res != (it == values.end() ? -ENOENT : 0))
{
fprintf(stderr, "ERROR: get %s: %d (%s)\n", key.c_str(), res, strerror(-res));
if (stop_on_error)
exit(1);
}
else if (it != values.end() && value != it->second)
{
fprintf(stderr, "ERROR: get %s: mismatch: %s vs %s\n", key.c_str(), value.c_str(), it->second.c_str());
if (stop_on_error)
exit(1);
}
ringloop->wakeup();
});
}
else if (dice < reopen_prob+get_prob+add_prob+update_prob)
{
bool is_add = false;
std::string key;
if (dice < reopen_prob+get_prob+add_prob)
{
// add
is_add = true;
uint64_t key_len = min_key_len + (max_key_len > min_key_len ? lrand48() % (max_key_len-min_key_len) : 0);
key = key_prefix + random_str(key_len) + key_suffix;
}
else
{
// update
key = random_str(max_key_len);
auto k_it = values.lower_bound(key);
if (k_it == values.end())
continue;
key = k_it->first;
}
if (changing_keys.find(key) != changing_keys.end())
continue;
uint64_t value_len = min_value_len + (max_value_len > min_value_len ? lrand48() % (max_value_len-min_value_len) : 0);
auto value = random_str(value_len);
start_change(key);
ops_sent++;
in_progress++;
if (trace)
printf("set %s = %s\n", key.c_str(), value.c_str());
timespec tv_begin;
clock_gettime(CLOCK_REALTIME, &tv_begin);
db->set(key, value, [this, key, value, tv_begin, is_add](int res)
{
add_stat(is_add ? stat.add : stat.update, tv_begin);
stop_change(key);
ops_done++;
in_progress--;
if (res != 0)
{
fprintf(stderr, "ERROR: set %s = %s: %d (%s)\n", key.c_str(), value.c_str(), res, strerror(-res));
if (stop_on_error)
exit(1);
}
else
{
values[key] = value;
}
ringloop->wakeup();
}, NULL);
}
else if (dice < reopen_prob+get_prob+add_prob+update_prob+del_prob)
{
// delete
auto key = random_str(max_key_len);
auto k_it = values.lower_bound(key);
if (k_it == values.end())
continue;
key = k_it->first;
if (changing_keys.find(key) != changing_keys.end())
continue;
start_change(key);
ops_sent++;
in_progress++;
if (trace)
printf("del %s\n", key.c_str());
timespec tv_begin;
clock_gettime(CLOCK_REALTIME, &tv_begin);
db->del(key, [this, key, tv_begin](int res)
{
add_stat(stat.del, tv_begin);
stop_change(key);
ops_done++;
in_progress--;
if (res != 0)
{
fprintf(stderr, "ERROR: del %s: %d (%s)\n", key.c_str(), res, strerror(-res));
if (stop_on_error)
exit(1);
}
else
{
values.erase(key);
}
ringloop->wakeup();
}, NULL);
}
else if (dice < reopen_prob+get_prob+add_prob+update_prob+del_prob+list_prob)
{
// list
ops_sent++;
in_progress++;
auto key = random_str(max_key_len);
auto lst = new kv_test_listing_t;
auto k_it = values.lower_bound(key);
lst->count = min_list_count + (max_list_count > min_list_count ? lrand48() % (max_list_count-min_list_count) : 0);
lst->handle = db->list_start(k_it == values.begin() ? key_prefix : key);
lst->next_after = k_it == values.begin() ? key_prefix : key;
lst->inflights = changing_keys;
listings.insert(lst);
if (trace)
printf("list from %s\n", key.c_str());
clock_gettime(CLOCK_REALTIME, &lst->tv_begin);
db->list_next(lst->handle, [this, lst](int res, const std::string & key, const std::string & value)
{
if (log_level >= 11)
printf("list: %s = %s\n", key.c_str(), value.c_str());
if (res >= 0 && key_prefix.size() && (key.size() < key_prefix.size() ||
key.substr(0, key_prefix.size()) != key_prefix))
{
// stop at this key
res = -ENOENT;
}
if (res < 0 || (lst->count > 0 && lst->done >= lst->count))
{
add_stat(stat.list, lst->tv_begin);
if (res == 0)
{
// ok (done >= count)
}
else if (res != -ENOENT)
{
fprintf(stderr, "ERROR: list: %d (%s)\n", res, strerror(-res));
lst->error = true;
}
else
{
auto k_it = lst->next_after == "" ? values.begin() : values.upper_bound(lst->next_after);
while (k_it != values.end())
{
while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end())
k_it++;
if (k_it != values.end())
{
fprintf(stderr, "ERROR: list: missing key %s\n", (k_it++)->first.c_str());
lst->error = true;
}
}
}
if (lst->error && stop_on_error)
exit(1);
ops_done++;
in_progress--;
db->list_close(lst->handle);
delete lst;
listings.erase(lst);
ringloop->wakeup();
}
else
{
stat.list_keys++;
// Do not check modified keys in listing
// Listing may return their old or new state
if ((!key_suffix.size() || key.size() >= key_suffix.size() &&
key.substr(key.size()-key_suffix.size()) == key_suffix) &&
lst->inflights.find(key) == lst->inflights.end())
{
lst->done++;
auto k_it = lst->next_after == "" ? values.begin() : values.upper_bound(lst->next_after);
while (true)
{
while (k_it != values.end() && lst->inflights.find(k_it->first) != lst->inflights.end())
{
k_it++;
}
if (k_it == values.end() || k_it->first > key)
{
fprintf(stderr, "ERROR: list: extra key %s\n", key.c_str());
lst->error = true;
break;
}
else if (k_it->first < key)
{
fprintf(stderr, "ERROR: list: missing key %s\n", k_it->first.c_str());
lst->error = true;
lst->next_after = k_it->first;
k_it++;
}
else
{
if (k_it->second != value)
{
fprintf(stderr, "ERROR: list: mismatch: %s = %s but should be %s\n",
key.c_str(), value.c_str(), k_it->second.c_str());
lst->error = true;
}
lst->next_after = k_it->first;
break;
}
}
}
db->list_next(lst->handle, NULL);
}
});
}
}
}
void kv_test_t::add_stat(kv_test_lat_t & stat, timespec tv_begin)
{
timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end);
int64_t usec = (tv_end.tv_sec - tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - tv_begin.tv_nsec)/1000;
if (usec > 0)
{
stat.usec += usec;
stat.count++;
}
}
void kv_test_t::print_stats(kv_test_stat_t & prev_stat, timespec & prev_stat_time)
{
timespec cur_stat_time;
clock_gettime(CLOCK_REALTIME, &cur_stat_time);
int64_t usec = (cur_stat_time.tv_sec - prev_stat_time.tv_sec)*1000000 +
(cur_stat_time.tv_nsec - prev_stat_time.tv_nsec)/1000;
if (usec > 0)
{
kv_test_lat_t *lats[] = { &stat.get, &stat.add, &stat.update, &stat.del, &stat.list };
kv_test_lat_t *prev[] = { &prev_stat.get, &prev_stat.add, &prev_stat.update, &prev_stat.del, &prev_stat.list };
if (!json_output)
{
char buf[128] = { 0 };
for (int i = 0; i < sizeof(lats)/sizeof(lats[0]); i++)
{
snprintf(buf, sizeof(buf)-1, "%.1f %s/s (%lu us)", (lats[i]->count-prev[i]->count)*1000000.0/usec,
lats[i]->name, (lats[i]->usec-prev[i]->usec)/(lats[i]->count-prev[i]->count > 0 ? lats[i]->count-prev[i]->count : 1));
int k;
for (k = strlen(buf); k < strlen(lats[i]->name)+21; k++)
buf[k] = ' ';
buf[k] = 0;
printf("%s", buf);
}
printf("\n");
}
else
{
int64_t runtime = (cur_stat_time.tv_sec - start_stat_time.tv_sec)*1000000 +
(cur_stat_time.tv_nsec - start_stat_time.tv_nsec)/1000;
printf("{\"runtime\":%.1f", (double)runtime/1000000.0);
for (int i = 0; i < sizeof(lats)/sizeof(lats[0]); i++)
{
if (lats[i]->count > prev[i]->count)
{
printf(
",\"%s\":{\"avg\":{\"iops\":%.1f,\"usec\":%lu},\"total\":{\"count\":%lu,\"usec\":%lu}}",
lats[i]->name, (lats[i]->count-prev[i]->count)*1000000.0/usec,
(lats[i]->usec-prev[i]->usec)/(lats[i]->count-prev[i]->count),
lats[i]->count, lats[i]->usec
);
}
}
printf("}\n");
}
}
prev_stat = stat;
prev_stat_time = cur_stat_time;
}
void kv_test_t::print_total_stats()
{
if (!json_output)
printf("Total:\n");
kv_test_stat_t start_stats;
timespec start_stat_time = this->start_stat_time;
print_stats(start_stats, start_stat_time);
}
void kv_test_t::start_change(const std::string & key)
{
changing_keys.insert(key);
for (auto lst: listings)
{
lst->inflights.insert(key);
}
}
void kv_test_t::stop_change(const std::string & key)
{
changing_keys.erase(key);
}
int main(int narg, const char *args[])
{
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
exe_name = args[0];
kv_test_t *p = new kv_test_t();
p->run(kv_test_t::parse_args(narg, args));
delete p;
return 0;
}

View File

@@ -22,7 +22,7 @@ void osd_messenger_t::init()
{
rdma_context = msgr_rdma_context_t::create(
rdma_device != "" ? rdma_device.c_str() : NULL,
rdma_port_num, rdma_gid_index, rdma_mtu, rdma_odp, log_level
rdma_port_num, rdma_gid_index, rdma_mtu, log_level
);
if (!rdma_context)
{
@@ -42,6 +42,12 @@ void osd_messenger_t::init()
handle_rdma_events();
}
}
#endif
#ifndef SO_ZEROCOPY
if (log_level > 0)
{
fprintf(stderr, "Zero-copy TCP send is not supported in this build, ignoring\n");
}
#endif
keepalive_timer_id = tfd->set_timer(1000, true, [this](int)
{
@@ -167,13 +173,14 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->rdma_max_msg = config["rdma_max_msg"].uint64_value();
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
this->rdma_max_msg = 129*1024;
this->rdma_odp = config["rdma_odp"].bool_value();
#endif
this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value();
if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024)
this->receive_buffer_size = 65536;
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
config["use_sync_send_recv"].uint64_value();
this->use_zerocopy_send = config["use_zerocopy_send"].bool_value() ||
config["use_zerocopy_send"].uint64_value();
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();
if (!this->peer_connect_interval)
this->peer_connect_interval = 5;
@@ -304,8 +311,7 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
on_connect_peer(peer_osd, -result);
return;
}
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
set_socket_options(cl);
cl->peer_state = PEER_CONNECTED;
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
{
@@ -315,6 +321,23 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd)
check_peer_config(cl);
}
void osd_messenger_t::set_socket_options(osd_client_t *cl)
{
int one = 1;
setsockopt(cl->peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
#ifdef SO_ZEROCOPY
if (!use_zerocopy_send)
cl->zerocopy_send = false;
else if (setsockopt(cl->peer_fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one)) != 0)
{
if (log_level > 0)
fprintf(stderr, "[OSD %lu] Failed to enable zero-copy send for client %d: %s\n", this->osd_num, cl->peer_fd, strerror(errno));
}
else
cl->zerocopy_send = true;
#endif
}
void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)
{
// Mark client as ready (i.e. some data is available)
@@ -491,14 +514,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
fprintf(stderr, "Connected to OSD %lu using RDMA\n", cl->osd_num);
}
cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
{
// Do not miss the disconnection!
if (epoll_events & EPOLLRDHUP)
{
handle_peer_epoll(peer_fd, epoll_events);
}
});
tfd->set_fd_handler(cl->peer_fd, false, NULL);
// Add the initial receive request
try_recv_rdma(cl);
}
@@ -523,14 +539,13 @@ void osd_messenger_t::accept_connections(int listen_fd)
fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd,
addr_to_string(addr).c_str());
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
clients[peer_fd] = new osd_client_t();
clients[peer_fd]->peer_addr = addr;
clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
clients[peer_fd]->peer_fd = peer_fd;
clients[peer_fd]->peer_state = PEER_CONNECTED;
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);
auto cl = clients[peer_fd] = new osd_client_t();
cl->peer_addr = addr;
cl->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
cl->peer_fd = peer_fd;
cl->peer_state = PEER_CONNECTED;
cl->in_buf = malloc_or_die(receive_buffer_size);
set_socket_options(cl);
// Add FD to epoll
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
{

View File

@@ -45,6 +45,12 @@ struct msgr_rdma_connection_t;
struct msgr_rdma_context_t;
#endif
struct msgr_zc_not_t
{
osd_op_t *op;
uint32_t nsend;
};
struct osd_client_t
{
int refs = 0;
@@ -57,6 +63,7 @@ struct osd_client_t
int ping_time_remaining = 0;
int idle_time_remaining = 0;
osd_num_t osd_num = 0;
bool zerocopy_send = false;
void *in_buf = NULL;
@@ -87,6 +94,12 @@ struct osd_client_t
int write_state = 0;
std::vector<iovec> send_list, next_send_list;
std::vector<msgr_sendp_t> outbox, next_outbox;
std::vector<msgr_zc_not_t> zerocopy_sent;
uint64_t outbox_size = 0, next_outbox_size = 0;
uint32_t zerocopy_notification_idx = 0;
uint32_t zerocopy_notification_prev = 0;
uint8_t zerocopy_notification_buf[256];
struct msghdr zerocopy_notification_msg;
~osd_client_t();
};
@@ -123,6 +136,7 @@ protected:
int osd_ping_timeout = 0;
int log_level = 0;
bool use_sync_send_recv = false;
bool use_zerocopy_send = false;
#ifdef WITH_RDMA
bool use_rdma = true;
@@ -131,7 +145,6 @@ protected:
msgr_rdma_context_t *rdma_context = NULL;
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
uint64_t rdma_max_msg = 0;
bool rdma_odp = false;
#endif
std::vector<int> read_ready_clients;
@@ -186,9 +199,11 @@ protected:
void check_peer_config(osd_client_t *cl);
void cancel_osd_ops(osd_client_t *cl);
void cancel_op(osd_op_t *op);
void set_socket_options(osd_client_t *cl);
bool try_send(osd_client_t *cl);
void handle_send(int result, osd_client_t *cl);
void handle_zerocopy_notification(osd_client_t *cl, int res);
bool handle_read(int result, osd_client_t *cl);
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
@@ -198,9 +213,7 @@ protected:
void handle_reply_ready(osd_op_t *op);
#ifdef WITH_RDMA
void try_send_rdma(osd_client_t *cl);
void try_send_rdma_odp(osd_client_t *cl);
void try_send_rdma_nodp(osd_client_t *cl);
bool try_send_rdma(osd_client_t *cl);
bool try_recv_rdma(osd_client_t *cl);
void handle_rdma_events();
#endif

View File

@@ -47,29 +47,11 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
if (qp)
ibv_destroy_qp(qp);
if (recv_buffers.size())
{
for (auto b: recv_buffers)
{
if (b.mr)
ibv_dereg_mr(b.mr);
free(b.buf);
}
recv_buffers.clear();
}
if (send_out.mr)
{
ibv_dereg_mr(send_out.mr);
send_out.mr = NULL;
}
if (send_out.buf)
{
free(send_out.buf);
send_out.buf = NULL;
}
send_out_size = 0;
free(b);
}
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, bool odp, int log_level)
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
{
int res;
ibv_device **dev_list = NULL;
@@ -154,27 +136,21 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
fprintf(stderr, "Couldn't query RDMA device for its features\n");
goto cleanup;
}
ctx->odp = odp;
if (ctx->odp &&
(!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)))
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{
ctx->odp = false;
if (log_level > 0)
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable, disabling it\n");
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n");
goto cleanup;
}
}
if (ctx->odp)
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr)
{
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr)
{
fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup;
}
fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup;
}
ctx->channel = ibv_create_comp_channel(ctx->context);
@@ -389,34 +365,12 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
cl->rdma_conn->cur_send++;
}
static int try_send_rdma_copy(osd_client_t *cl, uint8_t *dst, int dst_len)
{
auto rc = cl->rdma_conn;
int total_dst_len = dst_len;
while (dst_len > 0 && rc->send_pos < cl->send_list.size())
{
iovec & iov = cl->send_list[rc->send_pos];
uint32_t len = (uint32_t)(iov.iov_len-rc->send_buf_pos < dst_len
? iov.iov_len-rc->send_buf_pos : dst_len);
memcpy(dst, iov.iov_base+rc->send_buf_pos, len);
dst += len;
dst_len -= len;
rc->send_buf_pos += len;
if (rc->send_buf_pos >= iov.iov_len)
{
rc->send_pos++;
rc->send_buf_pos = 0;
}
}
return total_dst_len-dst_len;
}
void osd_messenger_t::try_send_rdma_odp(osd_client_t *cl)
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
{
auto rc = cl->rdma_conn;
if (!cl->send_list.size() || rc->cur_send >= rc->max_send)
{
return;
return true;
}
uint64_t op_size = 0, op_sge = 0;
ibv_sge sge[rc->max_sge];
@@ -454,70 +408,15 @@ void osd_messenger_t::try_send_rdma_odp(osd_client_t *cl)
rc->send_sizes.push_back(op_size);
try_send_rdma_wr(cl, sge, op_sge);
}
return true;
}
void osd_messenger_t::try_send_rdma_nodp(osd_client_t *cl)
{
auto rc = cl->rdma_conn;
if (!rc->send_out_size)
{
// Allocate send ring buffer, if not yet
rc->send_out_size = rc->max_msg*rdma_max_send;
rc->send_out.buf = malloc_or_die(rc->send_out_size);
if (!rdma_context->odp)
{
rc->send_out.mr = ibv_reg_mr(rdma_context->pd, rc->send_out.buf, rc->send_out_size, 0);
if (!rc->send_out.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
}
}
// Copy data into the buffer and send it
uint8_t *dst = NULL;
int dst_len = 0;
int copied = 1;
while (!rc->send_out_full && copied > 0 && rc->cur_send < rc->max_send)
{
dst = (uint8_t*)rc->send_out.buf + rc->send_out_pos;
dst_len = (rc->send_out_pos < rc->send_out_size ? rc->send_out_size-rc->send_out_pos : rc->send_done_pos-rc->send_out_pos);
if (dst_len > rc->max_msg)
dst_len = rc->max_msg;
copied = try_send_rdma_copy(cl, dst, dst_len);
if (copied > 0)
{
rc->send_out_pos += copied;
if (rc->send_out_pos == rc->send_out_size)
rc->send_out_pos = 0;
assert(rc->send_out_pos < rc->send_out_size);
if (rc->send_out_pos >= rc->send_done_pos)
rc->send_out_full = true;
ibv_sge sge = {
.addr = (uintptr_t)dst,
.length = (uint32_t)copied,
.lkey = rdma_context->odp ? rdma_context->mr->lkey : rc->send_out.mr->lkey,
};
try_send_rdma_wr(cl, &sge, 1);
rc->send_sizes.push_back(copied);
}
}
}
void osd_messenger_t::try_send_rdma(osd_client_t *cl)
{
if (rdma_context->odp)
try_send_rdma_odp(cl);
else
try_send_rdma_nodp(cl);
}
static void try_recv_rdma_wr(osd_client_t *cl, msgr_rdma_buf_t b)
static void try_recv_rdma_wr(osd_client_t *cl, void *buf)
{
ibv_sge sge = {
.addr = (uintptr_t)b.buf,
.addr = (uintptr_t)buf,
.length = (uint32_t)cl->rdma_conn->max_msg,
.lkey = cl->rdma_conn->ctx->odp ? cl->rdma_conn->ctx->mr->lkey : b.mr->lkey,
.lkey = cl->rdma_conn->ctx->mr->lkey,
};
ibv_recv_wr *bad_wr = NULL;
ibv_recv_wr wr = {
@@ -539,19 +438,9 @@ bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
auto rc = cl->rdma_conn;
while (rc->cur_recv < rc->max_recv)
{
msgr_rdma_buf_t b;
b.buf = malloc_or_die(rc->max_msg);
if (!rdma_context->odp)
{
b.mr = ibv_reg_mr(rdma_context->pd, b.buf, rc->max_msg, IBV_ACCESS_LOCAL_WRITE);
if (!b.mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
}
rc->recv_buffers.push_back(b);
try_recv_rdma_wr(cl, b);
void *buf = malloc_or_die(rc->max_msg);
rc->recv_buffers.push_back(buf);
try_recv_rdma_wr(cl, buf);
}
return true;
}
@@ -603,7 +492,7 @@ void osd_messenger_t::handle_rdma_events()
if (!is_send)
{
rc->cur_recv--;
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf].buf, wc[i].byte_len))
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
{
// handle_read_buffer may stop the client
continue;
@@ -616,14 +505,6 @@ void osd_messenger_t::handle_rdma_events()
rc->cur_send--;
uint64_t sent_size = rc->send_sizes.at(0);
rc->send_sizes.erase(rc->send_sizes.begin(), rc->send_sizes.begin()+1);
if (!rdma_context->odp)
{
rc->send_done_pos += sent_size;
rc->send_out_full = false;
if (rc->send_done_pos == rc->send_out_size)
rc->send_done_pos = 0;
assert(rc->send_done_pos < rc->send_out_size);
}
int send_pos = 0, send_buf_pos = 0;
while (sent_size > 0)
{

View File

@@ -23,7 +23,6 @@ struct msgr_rdma_context_t
ibv_device *dev = NULL;
ibv_device_attr_ex attrx;
ibv_pd *pd = NULL;
bool odp = false;
ibv_mr *mr = NULL;
ibv_comp_channel *channel = NULL;
ibv_cq *cq = NULL;
@@ -36,16 +35,10 @@ struct msgr_rdma_context_t
int max_cqe = 0;
int used_max_cqe = 0;
static msgr_rdma_context_t *create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, bool odp, int log_level);
static msgr_rdma_context_t *create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level);
~msgr_rdma_context_t();
};
struct msgr_rdma_buf_t
{
void *buf = NULL;
ibv_mr *mr = NULL;
};
struct msgr_rdma_connection_t
{
msgr_rdma_context_t *ctx = NULL;
@@ -57,11 +50,8 @@ struct msgr_rdma_connection_t
int send_pos = 0, send_buf_pos = 0;
int next_recv_buf = 0;
std::vector<msgr_rdma_buf_t> recv_buffers;
std::vector<void*> recv_buffers;
std::vector<uint64_t> send_sizes;
msgr_rdma_buf_t send_out;
int send_out_pos = 0, send_done_pos = 0, send_out_size = 0;
bool send_out_full = false;
~msgr_rdma_connection_t();
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);

View File

@@ -3,10 +3,15 @@
#define _XOPEN_SOURCE
#include <limits.h>
#include <sys/epoll.h>
#include "messenger.h"
#include <linux/errqueue.h>
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY 0
#endif
void osd_messenger_t::outbox_push(osd_op_t *cur_op)
{
assert(cur_op->peer_fd);
@@ -37,6 +42,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
}
auto & to_send_list = cl->write_msg.msg_iovlen ? cl->next_send_list : cl->send_list;
auto & to_outbox = cl->write_msg.msg_iovlen ? cl->next_outbox : cl->outbox;
auto & to_size = cl->write_msg.msg_iovlen ? cl->next_outbox_size : cl->outbox_size;
if (cur_op->op_type == OSD_OP_IN)
{
measure_exec(cur_op);
@@ -47,6 +53,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE });
cl->sent_ops[cur_op->req.hdr.id] = cur_op;
}
to_size += OSD_PACKET_SIZE;
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_HDR });
// Bitmap
if (cur_op->op_type == OSD_OP_IN &&
@@ -58,6 +65,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
.iov_len = cur_op->reply.sec_rw.attr_len,
});
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
to_size += cur_op->reply.sec_rw.attr_len;
}
else if (cur_op->op_type == OSD_OP_OUT &&
(cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) &&
@@ -68,6 +76,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
.iov_len = cur_op->req.sec_rw.attr_len,
});
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
to_size += cur_op->req.sec_rw.attr_len;
}
// Operation data
if ((cur_op->op_type == OSD_OP_IN
@@ -90,15 +99,22 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
assert(cur_op->iov.buf[i].iov_base);
to_send_list.push_back(cur_op->iov.buf[i]);
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
to_size += cur_op->iov.buf[i].iov_len;
}
}
}
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
if (cur_op->op_type == OSD_OP_IN && cur_op->reply.hdr.retval > 0)
{
to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->reply.hdr.retval });
to_size += cur_op->reply.hdr.retval;
}
else if (cur_op->op_type == OSD_OP_OUT && cur_op->req.sec_read_bmp.len > 0)
{
to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len });
to_size += cur_op->req.sec_read_bmp.len;
}
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
}
if (cur_op->op_type == OSD_OP_IN)
@@ -184,17 +200,19 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
}
cl->write_msg.msg_iov = cl->send_list.data();
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
cl->write_msg.msg_flags = (cl->zerocopy_send && (cl->outbox_size/cl->send_list.size()) >= 4096 ? MSG_ZEROCOPY : 0);
cl->refs++;
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_send(data->res, cl); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, 0);
my_uring_prep_sendmsg(sqe, peer_fd, &cl->write_msg, cl->write_msg.msg_flags);
}
else
{
cl->write_msg.msg_iov = cl->send_list.data();
cl->write_msg.msg_iovlen = cl->send_list.size() < IOV_MAX ? cl->send_list.size() : IOV_MAX;
cl->write_msg.msg_flags = (cl->zerocopy_send && (cl->outbox_size/cl->send_list.size()) >= 4096 ? MSG_ZEROCOPY : 0);
cl->refs++;
int result = sendmsg(peer_fd, &cl->write_msg, MSG_NOSIGNAL);
int result = sendmsg(peer_fd, &cl->write_msg, MSG_NOSIGNAL | cl->write_msg.msg_flags);
if (result < 0)
{
result = -errno;
@@ -204,6 +222,62 @@ bool osd_messenger_t::try_send(osd_client_t *cl)
return true;
}
void osd_messenger_t::handle_zerocopy_notification(osd_client_t *cl, int res)
{
cl->refs--;
if (cl->peer_state == PEER_STOPPED)
{
if (cl->refs <= 0)
{
delete cl;
}
return;
}
if (res != 0)
{
return;
}
if (cl->zerocopy_notification_msg.msg_flags & MSG_CTRUNC)
{
fprintf(stderr, "zero-copy send notification truncated on client socket %d\n", cl->peer_fd);
return;
}
for (struct cmsghdr *cm = CMSG_FIRSTHDR(&cl->zerocopy_notification_msg); cm; cm = CMSG_NXTHDR(&cl->zerocopy_notification_msg, cm))
{
if (cm->cmsg_level == SOL_IP && cm->cmsg_type == IP_RECVERR)
{
struct sock_extended_err *serr = (struct sock_extended_err*)CMSG_DATA(cm);
if (serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY)
{
// completed sends numbered serr->ee_info .. serr->ee_data
int start = 0;
while (start < cl->zerocopy_sent.size() && cl->zerocopy_sent[start].nsend < serr->ee_info)
start++;
int end = start;
if (serr->ee_data < serr->ee_info)
{
// counter has wrapped around
while (end < cl->zerocopy_sent.size() && cl->zerocopy_sent[end].nsend >= cl->zerocopy_sent[start].nsend)
end++;
}
while (end < cl->zerocopy_sent.size() && cl->zerocopy_sent[end].nsend <= serr->ee_data)
end++;
if (end > start)
{
for (int i = start; i < end; i++)
{
delete cl->zerocopy_sent[i].op;
}
cl->zerocopy_sent.erase(
cl->zerocopy_sent.begin() + start,
cl->zerocopy_sent.begin() + end
);
}
}
}
}
}
void osd_messenger_t::send_replies()
{
for (int i = 0; i < write_ready_clients.size(); i++)
@@ -231,16 +305,19 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
}
return;
}
if (result < 0 && result != -EAGAIN && result != -EINTR)
if (result < 0 && result != -EAGAIN && result != -EINTR && result != -ENOBUFS)
{
// this is a client socket, so don't panic. just disconnect it
fprintf(stderr, "Client %d socket write error: %d (%s). Disconnecting client\n", cl->peer_fd, -result, strerror(-result));
stop_client(cl->peer_fd);
return;
}
bool used_zerocopy = false;
if (result >= 0)
{
used_zerocopy = (cl->write_msg.msg_flags & MSG_ZEROCOPY) ? true : false;
int done = 0;
int bytes_written = result;
while (result > 0 && done < cl->send_list.size())
{
iovec & iov = cl->send_list[done];
@@ -249,7 +326,19 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
if (cl->outbox[done].flags & MSGR_SENDP_FREE)
{
// Reply fully sent
delete cl->outbox[done].op;
if (!used_zerocopy)
{
delete cl->outbox[done].op;
}
else
{
// With zero-copy send the difference is that we must keep the buffer (i.e. the operation)
// allocated until we get send notification from MSG_ERRQUEUE
cl->zerocopy_sent.push_back((msgr_zc_not_t){
.op = cl->outbox[done].op,
.nsend = cl->zerocopy_notification_idx,
});
}
}
result -= iov.iov_len;
done++;
@@ -261,6 +350,11 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
break;
}
}
if (used_zerocopy)
{
cl->zerocopy_notification_idx++;
}
cl->outbox_size -= bytes_written;
if (done > 0)
{
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+done);
@@ -270,8 +364,10 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
{
cl->send_list.insert(cl->send_list.end(), cl->next_send_list.begin(), cl->next_send_list.end());
cl->outbox.insert(cl->outbox.end(), cl->next_outbox.begin(), cl->next_outbox.end());
cl->outbox_size += cl->next_outbox_size;
cl->next_send_list.clear();
cl->next_outbox.clear();
cl->next_outbox_size = 0;
}
cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0;
#ifdef WITH_RDMA
@@ -284,14 +380,7 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
fprintf(stderr, "Successfully connected with client %d using RDMA\n", cl->peer_fd);
}
cl->peer_state = PEER_RDMA;
tfd->set_fd_handler(cl->peer_fd, false, [this](int peer_fd, int epoll_events)
{
// Do not miss the disconnection!
if (epoll_events & EPOLLRDHUP)
{
handle_peer_epoll(peer_fd, epoll_events);
}
});
tfd->set_fd_handler(cl->peer_fd, false, NULL);
// Add the initial receive request
try_recv_rdma(cl);
}
@@ -301,4 +390,34 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl)
{
write_ready_clients.push_back(cl->peer_fd);
}
if (used_zerocopy && (cl->zerocopy_notification_idx-cl->zerocopy_notification_prev) >= 16 &&
cl->zerocopy_sent.size() > 0)
{
cl->zerocopy_notification_prev = cl->zerocopy_notification_idx;
cl->zerocopy_notification_msg = {
.msg_control = cl->zerocopy_notification_buf,
.msg_controllen = sizeof(cl->zerocopy_notification_buf),
};
cl->refs++;
io_uring_sqe* sqe = NULL;
if (ringloop && !use_sync_send_recv)
{
sqe = ringloop->get_sqe();
}
if (!sqe)
{
int res = recvmsg(cl->peer_fd, &cl->zerocopy_notification_msg, MSG_ERRQUEUE|MSG_DONTWAIT);
if (res < 0)
{
res = -errno;
}
handle_zerocopy_notification(cl, res);
}
else
{
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, cl](ring_data_t *data) { handle_zerocopy_notification(cl, data->res); };
my_uring_prep_recvmsg(sqe, cl->peer_fd, &cl->zerocopy_notification_msg, MSG_ERRQUEUE);
}
}
}

View File

@@ -225,7 +225,7 @@ public:
cfg = obj;
}
// Create client
ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
if (!inode)

View File

@@ -124,7 +124,7 @@ void nfs_proxy_t::run(json11::Json cfg)
cfg = obj;
}
// Create client
ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
cmd = new cli_tool_t();

View File

@@ -541,15 +541,11 @@ void osd_t::print_slow()
}
else if (op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)
{
for (uint64_t i = 0; i < op->req.sec_stab.len && i < sizeof(obj_ver_id)*12; i += sizeof(obj_ver_id))
for (uint64_t i = 0; i < op->req.sec_stab.len; i += sizeof(obj_ver_id))
{
obj_ver_id *ov = (obj_ver_id*)((uint8_t*)op->buf + i);
bufprintf(i == 0 ? " %lx:%lx v%lu" : ", %lx:%lx v%lu", ov->oid.inode, ov->oid.stripe, ov->version);
}
if (op->req.sec_stab.len > sizeof(obj_ver_id)*12)
{
bufprintf(", ... (%lu items)", op->req.sec_stab.len/sizeof(obj_ver_id));
}
}
else if (op->req.hdr.opcode == OSD_OP_SEC_LIST)
{

View File

@@ -19,14 +19,6 @@ static void handle_sigint(int sig)
exit(0);
}
static const char* help_text =
"Vitastor OSD (block object storage daemon) " VERSION "\n"
"(c) Vitaliy Filippov, 2019+ (VNPL-1.1)\n"
"\n"
"OSDs are usually started by vitastor-disk.\n"
"Manual usage: vitastor-osd [--option value] ...\n"
;
int main(int narg, char *args[])
{
setvbuf(stdout, NULL, _IONBF, 0);
@@ -45,20 +37,10 @@ int main(int narg, char *args[])
char *opt = args[i]+2;
config[std::string(opt)] = std::string(args[++i]);
}
else if (!strcmp(args[i], "--help"))
{
printf("%s", help_text);
return 0;
}
}
if (!config.size())
{
printf("%s", help_text);
return 1;
}
signal(SIGINT, handle_sigint);
signal(SIGTERM, handle_sigint);
ring_loop_t *ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
ring_loop_t *ringloop = new ring_loop_t(512);
osd = new osd_t(config, ringloop);
while (1)
{

View File

@@ -17,7 +17,7 @@ ring_loop_t::ring_loop_t(int qd)
{
throw std::runtime_error(std::string("io_uring_queue_init: ") + strerror(-ret));
}
free_ring_data_ptr = *ring.sq.kring_entries;
free_ring_data_ptr = *ring.cq.kring_entries;
ring_datas = (struct ring_data_t*)calloc(free_ring_data_ptr, sizeof(ring_data_t));
free_ring_data = (int*)malloc(sizeof(int) * free_ring_data_ptr);
if (!ring_datas || !free_ring_data)

View File

@@ -15,8 +15,6 @@
#include <functional>
#include <vector>
#define RINGLOOP_DEFAULT_SIZE 1024
static inline void my_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd, const void *addr, unsigned len, off_t offset)
{
// Prepare a read/write operation without clearing user_data
@@ -141,9 +139,11 @@ public:
if (free_ring_data_ptr == 0)
return NULL;
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
assert(sqe);
*sqe = { 0 };
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
if (sqe)
{
*sqe = { 0 };
io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
}
return sqe;
}
inline void set_immediate(const std::function<void()> cb)

View File

@@ -30,7 +30,7 @@ void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op);
int main(int narg, char *args[])
{
ring_consumer_t looper;
ring_loop_t *ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
ring_loop_t *ringloop = new ring_loop_t(512);
epoll_manager_t *epmgr = new epoll_manager_t(ringloop);
osd_messenger_t *msgr = new osd_messenger_t();
msgr->osd_num = 1351;

View File

@@ -11,7 +11,7 @@ int main(int narg, char *args[])
config["meta_device"] = "./test_meta.bin";
config["journal_device"] = "./test_journal.bin";
config["data_device"] = "./test_data.bin";
ring_loop_t *ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
ring_loop_t *ringloop = new ring_loop_t(512);
epoll_manager_t *epmgr = new epoll_manager_t(ringloop);
blockstore_t *bs = new blockstore_t(config, ringloop, epmgr->tfd);

View File

@@ -68,7 +68,7 @@ int main(int narg, char *args[])
| cfg["inode_id"].uint64_value();
uint64_t base_ver = 0;
// Create client
auto ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
auto ringloop = new ring_loop_t(512);
auto epmgr = new epoll_manager_t(ringloop);
auto cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
cli->on_ready([&]()

View File

@@ -6,7 +6,7 @@ includedir=${prefix}/@CMAKE_INSTALL_INCLUDEDIR@
Name: Vitastor
Description: Vitastor client library
Version: 1.2.0
Version: 1.1.0
Libs: -L${libdir} -lvitastor_client
Cflags: -I${includedir}

View File

@@ -114,7 +114,7 @@ vitastor_c *vitastor_c_create_qemu_uring(QEMUSetFDHandler *aio_set_fd_handler, v
ring_loop_t *ringloop = NULL;
try
{
ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
ringloop = new ring_loop_t(512);
}
catch (std::exception & e)
{
@@ -136,7 +136,7 @@ vitastor_c *vitastor_c_create_uring(const char *config_path, const char *etcd_ho
ring_loop_t *ringloop = NULL;
try
{
ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
ringloop = new ring_loop_t(512);
}
catch (std::exception & e)
{
@@ -167,7 +167,7 @@ vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len)
ring_loop_t *ringloop = NULL;
try
{
ringloop = new ring_loop_t(RINGLOOP_DEFAULT_SIZE);
ringloop = new ring_loop_t(512);
}
catch (std::exception & e)
{