Compare commits

..

14 Commits

Author SHA1 Message Date
fc83e3821c Fix write-over-delete failing for the very first entry in dirty_db 2023-10-21 17:08:32 +03:00
0d707fc83b Fix possible segfault in vitastor-cli ls -l 2023-10-21 17:08:32 +03:00
4f99f78430 Fix possible OSD crash during sync due to missing min_flushed_journal_sector reset 2023-09-17 00:36:46 +03:00
f926f8c2e0 Remove unused bs_sync fields 2023-09-17 00:36:46 +03:00
d73ad12c56 Fix fio_sec_osd attr_len 2023-09-17 00:36:46 +03:00
d68cec10e2 Remove erroneous block_size mismatch warnings on pools without matching PGs 2023-09-17 00:36:46 +03:00
9afa200a33 Flush STDOUT and STDERR before exiting from cli to fix Proxmox "Unexpected result" 2023-09-17 00:36:46 +03:00
aff6f3e970 Fix sscanf validation usage (field count instead of null_byte == 0) 2023-09-07 11:34:42 +03:00
49fca80f1c Add supported_truncate_flags 2023-09-07 11:34:42 +03:00
a028b4fa4c Make QEMU driver compatible with QEMU 8.1 2023-08-24 19:09:00 +03:00
da2bfd0b1e Fix co_truncate size division by BDRV_SECTOR_SIZE 2023-08-24 19:02:24 +03:00
cec5ceab77 Fix buffer insert in cluster_client 2023-08-24 19:02:24 +03:00
dc90faec7e Fix incorrect marking op parts as done with snapshots (could probably lead to client hangs) 2023-08-24 19:02:24 +03:00
20c62a4244 Fix monitor retrying failed etcd connection in an infinite loop without pauses 2023-08-24 19:02:24 +03:00
49 changed files with 534 additions and 1569 deletions

View File

@@ -96,9 +96,8 @@ SSD cache or "media-cache" - for example, a lot of Seagate EXOS drives have
it (they have internal SSD cache even though it's not stated in datasheets).
Setting this parameter to "all" or "small" in OSD parameters requires enabling
[disable_journal_fsync](layout-osd.en.yml#disable_journal_fsync) and
[disable_meta_fsync](layout-osd.en.yml#disable_meta_fsync), setting it to
"all" also requires enabling [disable_data_fsync](layout-osd.en.yml#disable_data_fsync).
disable_journal_fsync and disable_meta_fsync, setting it to "all" also requires
enabling disable_data_fsync.
TLDR: For optimal performance, set immediate_commit to "all" if you only use
SSDs with supercapacitor-based power loss protection (nonvolatile

View File

@@ -103,9 +103,8 @@ HDD-дисках с внутренним SSD или "медиа" кэшем - н
указано в спецификациях).
Указание "all" или "small" в настройках / командной строке OSD требует
включения [disable_journal_fsync](layout-osd.ru.yml#disable_journal_fsync) и
[disable_meta_fsync](layout-osd.ru.yml#disable_meta_fsync), значение "all"
также требует включения [disable_data_fsync](layout-osd.ru.yml#disable_data_fsync).
включения disable_journal_fsync и disable_meta_fsync, значение "all" также
требует включения disable_data_fsync.
Итого, вкратце: для оптимальной производительности установите
immediate_commit в значение "all", если вы используете в кластере только SSD

View File

@@ -213,6 +213,6 @@ Thus, recommended setups are:
3. Hybrid HDD+SSD: csum_block_size=4k + inmemory_metadata=false
4. HDD-only, faster random read: csum_block_size=32k
5. HDD-only, faster random write: csum_block_size=4k +
inmemory_metadata=false + meta_io=cached
inmemory_metadata=false + cached_io_meta=true
See also [meta_io](osd.en.md#meta_io).
See also [cached_io_meta](osd.en.md#cached_io_meta).

View File

@@ -226,6 +226,6 @@ csum_block_size данных.
3. Гибридные HDD+SSD: csum_block_size=4k + inmemory_metadata=false
4. Только HDD, быстрее случайное чтение: csum_block_size=32k
5. Только HDD, быстрее случайная запись: csum_block_size=4k +
inmemory_metadata=false + meta_io=cached
inmemory_metadata=false + cached_io_meta=true
Смотрите также [meta_io](osd.ru.md#meta_io).
Смотрите также [cached_io_meta](osd.ru.md#cached_io_meta).

View File

@@ -31,9 +31,9 @@ them, even without restarting by updating configuration in etcd.
- [max_flusher_count](#max_flusher_count)
- [inmemory_metadata](#inmemory_metadata)
- [inmemory_journal](#inmemory_journal)
- [data_io](#data_io)
- [meta_io](#meta_io)
- [journal_io](#journal_io)
- [cached_io_data](#cached_io_data)
- [cached_io_meta](#cached_io_meta)
- [cached_io_journal](#cached_io_journal)
- [journal_sector_buffer_count](#journal_sector_buffer_count)
- [journal_no_same_sector_overwrites](#journal_no_same_sector_overwrites)
- [throttle_small_writes](#throttle_small_writes)
@@ -258,59 +258,47 @@ is typically very small because it's sufficient to have 16-32 MB journal
for SSD OSDs. However, in theory it's possible that you'll want to turn it
off for hybrid (HDD+SSD) OSDs with large journals on quick devices.
## data_io
## cached_io_data
- Type: string
- Default: direct
- Type: boolean
- Default: false
I/O mode for *data*. One of "direct", "cached" or "directsync". Corresponds
to O_DIRECT, O_SYNC and O_DIRECT|O_SYNC, respectively.
Read and write *data* through Linux page cache, i.e. use a file descriptor
opened with O_SYNC, but without O_DIRECT for I/O. May improve read
performance for hot data and slower disks - HDDs and maybe SATA SSDs.
Not recommended for desktop SSDs without capacitors because O_SYNC flushes
disk cache on every write.
Choose "cached" to use Linux page cache. This may improve read performance
for hot data and slower disks - HDDs and maybe SATA SSDs - but will slightly
decrease write performance for fast disks because page cache is an overhead
itself.
## cached_io_meta
Choose "directsync" to use [immediate_commit](layout-cluster.ru.md#immediate_commit)
(which requires disable_data_fsync) with drives having write-back cache
which can't be turned off, for example, Intel Optane. Also note that *some*
desktop SSDs (for example, HP EX950) may ignore O_SYNC thus making
disable_data_fsync unsafe even with "directsync".
- Type: boolean
- Default: false
## meta_io
Read and write *metadata* through Linux page cache. May improve read
performance only if your drives are relatively slow (HDD, SATA SSD), and
only if checksums are enabled and [inmemory_metadata](#inmemory_metadata)
is disabled, because in this case metadata blocks are read from disk
on every read request to verify checksums and caching them may reduce this
extra read load.
- Type: string
- Default: direct
I/O mode for *metadata*. One of "direct", "cached" or "directsync".
"cached" may improve read performance, but only under the following conditions:
1. your drives are relatively slow (HDD, SATA SSD), and
2. checksums are enabled, and
3. [inmemory_metadata](#inmemory_metadata) is disabled.
Under all these conditions, metadata blocks are read from disk on every
read request to verify checksums and caching them may reduce this extra
read load. Without (3) metadata is never read from the disk after starting,
and without (2) metadata blocks are read from disk only during journal
Absolutely pointless to enable with enabled inmemory_metadata because all
metadata is kept in memory anyway, and likely pointless without checksums,
because in that case, metadata blocks are read from disk only during journal
flushing.
"directsync" is the same as above.
If the same device is used for data and metadata, enabling [cached_io_data](#cached_io_data)
also enables this parameter, given that it isn't turned off explicitly.
If the same device is used for data and metadata, meta_io by default is set
to the same value as [data_io](#data_io).
## cached_io_journal
## journal_io
- Type: boolean
- Default: false
- Type: string
- Default: direct
Read and write *journal* through Linux page cache. May improve read
performance if [inmemory_journal](#inmemory_journal) is turned off.
I/O mode for *journal*. One of "direct", "cached" or "directsync".
Here, "cached" may only improve read performance for recent writes and
only if [inmemory_journal](#inmemory_journal) is turned off.
If the same device is used for metadata and journal, journal_io by default
is set to the same value as [meta_io](#meta_io).
If the same device is used for metadata and journal, enabling [cached_io_meta](#cached_io_meta)
also enables this parameter, given that it isn't turned off explicitly.
## journal_sector_buffer_count

View File

@@ -32,9 +32,9 @@
- [max_flusher_count](#max_flusher_count)
- [inmemory_metadata](#inmemory_metadata)
- [inmemory_journal](#inmemory_journal)
- [data_io](#data_io)
- [meta_io](#meta_io)
- [journal_io](#journal_io)
- [cached_io_data](#cached_io_data)
- [cached_io_meta](#cached_io_meta)
- [cached_io_journal](#cached_io_journal)
- [journal_sector_buffer_count](#journal_sector_buffer_count)
- [journal_no_same_sector_overwrites](#journal_no_same_sector_overwrites)
- [throttle_small_writes](#throttle_small_writes)
@@ -266,62 +266,51 @@ Flusher - это микро-поток (корутина), которая коп
параметра может оказаться полезным для гибридных OSD (HDD+SSD) с большими
журналами, расположенными на быстром по сравнению с HDD устройстве.
## data_io
## cached_io_data
- Тип: строка
- Значение по умолчанию: direct
- Тип: булево (да/нет)
- Значение по умолчанию: false
Режим ввода-вывода для *данных*. Одно из значений "direct", "cached" или
"directsync", означающих O_DIRECT, O_SYNC и O_DIRECT|O_SYNC, соответственно.
Читать и записывать *данные* через системный кэш Linux (page cache), то есть,
использовать для данных файловый дескриптор, открытый без флага O_DIRECT, но
с флагом O_SYNC. Может улучшить скорость чтения для относительно медленных
дисков - HDD и, возможно, SATA SSD. Не рекомендуется для потребительских
SSD без конденсаторов, так как O_SYNC сбрасывает кэш диска при каждой записи.
Выберите "cached", чтобы использовать системный кэш Linux (page cache) при
чтении и записи. Это может улучшить скорость чтения горячих данных с
относительно медленных дисков - HDD и, возможно, SATA SSD - но немного
снижает производительность записи для быстрых дисков, так как кэш сам по
себе тоже добавляет накладные расходы.
## cached_io_meta
Выберите "directsync", если хотите задействовать
[immediate_commit](layout-cluster.ru.md#immediate_commit) (требующий
включенияd disable_data_fsync) на дисках с неотключаемым кэшем. Пример таких
дисков - Intel Optane. При этом также стоит иметь в виду, что *некоторые*
настольные SSD (например, HP EX950) игнорируют флаг O_SYNC, делая отключение
fsync небезопасным даже с режимом "directsync".
- Тип: булево (да/нет)
- Значение по умолчанию: false
## meta_io
- Тип: строка
- Значение по умолчанию: direct
Режим ввода-вывода для *метаданных*. Одно из значений "direct", "cached" или
"directsync".
"cached" может улучшить скорость чтения, если:
1. у вас медленные диски (HDD, SATA SSD)
2. контрольные суммы включены
3. параметр [inmemory_metadata](#inmemory_metadata) отключён.
При этих условиях блоки метаданных читаются с диска при каждом запросе чтения
Читать и записывать *метаданные* через системный кэш Linux. Может улучшить
скорость чтения, если у вас медленные диски, и только если контрольные суммы
включены, а параметр [inmemory_metadata](#inmemory_metadata) отключён, так
как в этом случае блоки метаданных читаются с диска при каждом запросе чтения
для проверки контрольных сумм и их кэширование может снизить дополнительную
нагрузку на диск. Без (3) метаданные никогда не читаются с диска после
запуска OSD, а без (2) блоки метаданных читаются только при сбросе журнала.
нагрузку на диск.
Если одно и то же устройство используется для данных и метаданных, режим
ввода-вывода метаданных по умолчанию устанавливается равным [data_io](#data_io).
Абсолютно бессмысленно включать данный параметр, если параметр
inmemory_metadata включён (по умолчанию это так), и также вероятно
бессмысленно включать его, если не включены контрольные суммы, так как в
этом случае блоки метаданных читаются с диска только во время сброса
журнала.
## journal_io
Если одно и то же устройство используется для данных и метаданных, включение
[cached_io_data](#cached_io_data) также включает данный параметр, при
условии, что он не отключён явным образом.
- Тип: строка
- Значение по умолчанию: direct
## cached_io_journal
Режим ввода-вывода для *журнала*. Одно из значений "direct", "cached" или
"directsync".
- Тип: булево (да/нет)
- Значение по умолчанию: false
Здесь "cached" может улучшить скорость чтения только недавно записанных
данных и только если параметр [inmemory_journal](#inmemory_journal)
Читать и записывать *журнал* через системный кэш Linux. Может улучшить
скорость чтения, если параметр [inmemory_journal](#inmemory_journal)
отключён.
Если одно и то же устройство используется для метаданных и журнала,
режим ввода-вывода журнала по умолчанию устанавливается равным
[meta_io](#meta_io).
включение [cached_io_meta](#cached_io_meta) также включает данный
параметр, при условии, что он не отключён явным образом.
## journal_sector_buffer_count

View File

@@ -205,8 +205,9 @@ This parameter usually doesn't require to be changed.
- Default: 131072
Block size for this pool. The value from /vitastor/config/global is used when
unspecified. Only OSDs with matching block_size are used for each pool. If you
want to further restrict OSDs for the pool, use [osd_tags](#osd_tags).
unspecified. If your cluster has OSDs with different block sizes then pool must
be restricted by [osd_tags](#osd_tags) to only include OSDs with matching block
size.
Read more about this parameter in [Cluster-Wide Disk Layout Parameters](layout-cluster.en.md#block_size).
@@ -215,9 +216,10 @@ Read more about this parameter in [Cluster-Wide Disk Layout Parameters](layout-c
- Type: integer
- Default: 4096
"Sector" size of virtual disks in this pool. The value from /vitastor/config/global
is used when unspecified. Similarly to block_size, only OSDs with matching
bitmap_granularity are used for each pool.
"Sector" size of virtual disks in this pool. The value from
/vitastor/config/global is used when unspecified. Similar to block_size, the
pool must be restricted by [osd_tags](#osd_tags) to only include OSDs with
matching bitmap_granularity.
Read more about this parameter in [Cluster-Wide Disk Layout Parameters](layout-cluster.en.md#bitmap_granularity).
@@ -227,11 +229,10 @@ Read more about this parameter in [Cluster-Wide Disk Layout Parameters](layout-c
- Default: none
Immediate commit setting for this pool. The value from /vitastor/config/global
is used when unspecified. Similarly to block_size, only OSDs with compatible
bitmap_granularity are used for each pool. "Compatible" means that a pool with
non-immediate commit will use OSDs with immediate commit enabled, but not vice
versa. I.e., pools with "none" use all OSDs, pools with "small" only use OSDs
with "all" or "small", and pools with "all" only use OSDs with "all".
is used when unspecified. Similar to block_size, the pool must be restricted by
[osd_tags](#osd_tags) to only include OSDs with compatible immediate_commit.
Compatible means that a pool with non-immediate commit will work with OSDs with
immediate commit enabled, but not vice versa.
Read more about this parameter in [Cluster-Wide Disk Layout Parameters](layout-cluster.en.md#immediate_commit).

View File

@@ -208,9 +208,8 @@ PG в Vitastor эферемерны, то есть вы можете менят
Размер блока для данного пула. Если не задан, используется значение из
/vitastor/config/global. Если в вашем кластере есть OSD с разными размерами
блока, пул будет использовать только OSD с размером блока, равным размеру блока
пула. Если вы хотите сильнее ограничить набор используемых для пула OSD -
используйте [osd_tags](#osd_tags).
блока, пул должен быть ограничен только OSD, блок которых равен блоку пула,
с помощью [osd_tags](#osd_tags).
О самом параметре читайте в разделе [Дисковые параметры уровня кластера](layout-cluster.ru.md#block_size).
@@ -220,8 +219,9 @@ PG в Vitastor эферемерны, то есть вы можете менят
- По умолчанию: 4096
Размер "сектора" виртуальных дисков в данном пуле. Если не задан, используется
значение из /vitastor/config/global. Аналогично block_size, каждый пул будет
использовать только OSD с совпадающей с пулом настройкой bitmap_granularity.
значение из /vitastor/config/global. Аналогично block_size, пул должен быть
ограничен OSD со значением bitmap_granularity, равным значению пула, с помощью
[osd_tags](#osd_tags).
О самом параметре читайте в разделе [Дисковые параметры уровня кластера](layout-cluster.ru.md#bitmap_granularity).
@@ -231,13 +231,11 @@ PG в Vitastor эферемерны, то есть вы можете менят
- По умолчанию: none
Настройка мгновенного коммита для данного пула. Если не задана, используется
значение из /vitastor/config/global. Аналогично block_size, каждый пул будет
использовать только OSD с *совместимыми* настройками immediate_commit.
"Совместимыми" означает, что пул с отключенным мгновенным коммитом будет
использовать OSD с включённым мгновенным коммитом, но не наоборот. То есть,
пул со значением "none" будет использовать все OSD, пул со "small" будет
использовать OSD с "all" или "small", а пул с "all" будет использовать только
OSD с "all".
значение из /vitastor/config/global. Аналогично block_size, пул должен быть
ограничен OSD со значением bitmap_granularity, совместимым со значением пула, с
помощью [osd_tags](#osd_tags). Совместимость означает, что пул с отключенным
мгновенным коммитом может работать на OSD с включённым мгновенным коммитом, но
не наоборот.
О самом параметре читайте в разделе [Дисковые параметры уровня кластера](layout-cluster.ru.md#immediate_commit).

View File

@@ -87,9 +87,8 @@
it (they have internal SSD cache even though it's not stated in datasheets).
Setting this parameter to "all" or "small" in OSD parameters requires enabling
[disable_journal_fsync](layout-osd.en.yml#disable_journal_fsync) and
[disable_meta_fsync](layout-osd.en.yml#disable_meta_fsync), setting it to
"all" also requires enabling [disable_data_fsync](layout-osd.en.yml#disable_data_fsync).
disable_journal_fsync and disable_meta_fsync, setting it to "all" also requires
enabling disable_data_fsync.
TLDR: For optimal performance, set immediate_commit to "all" if you only use
SSDs with supercapacitor-based power loss protection (nonvolatile
@@ -141,9 +140,8 @@
указано в спецификациях).
Указание "all" или "small" в настройках / командной строке OSD требует
включения [disable_journal_fsync](layout-osd.ru.yml#disable_journal_fsync) и
[disable_meta_fsync](layout-osd.ru.yml#disable_meta_fsync), значение "all"
также требует включения [disable_data_fsync](layout-osd.ru.yml#disable_data_fsync).
включения disable_journal_fsync и disable_meta_fsync, значение "all" также
требует включения disable_data_fsync.
Итого, вкратце: для оптимальной производительности установите
immediate_commit в значение "all", если вы используете в кластере только SSD

View File

@@ -244,9 +244,9 @@
3. Hybrid HDD+SSD: csum_block_size=4k + inmemory_metadata=false
4. HDD-only, faster random read: csum_block_size=32k
5. HDD-only, faster random write: csum_block_size=4k +
inmemory_metadata=false + meta_io=cached
inmemory_metadata=false + cached_io_meta=true
See also [meta_io](osd.en.md#meta_io).
See also [cached_io_meta](osd.en.md#cached_io_meta).
info_ru: |
Размер блока расчёта контрольных сумм.
@@ -271,6 +271,6 @@
3. Гибридные HDD+SSD: csum_block_size=4k + inmemory_metadata=false
4. Только HDD, быстрее случайное чтение: csum_block_size=32k
5. Только HDD, быстрее случайная запись: csum_block_size=4k +
inmemory_metadata=false + meta_io=cached
inmemory_metadata=false + cached_io_meta=true
Смотрите также [meta_io](osd.ru.md#meta_io).
Смотрите также [cached_io_meta](osd.ru.md#cached_io_meta).

View File

@@ -260,96 +260,73 @@
достаточно 16- или 32-мегабайтного журнала. Однако в теории отключение
параметра может оказаться полезным для гибридных OSD (HDD+SSD) с большими
журналами, расположенными на быстром по сравнению с HDD устройстве.
- name: data_io
type: string
default: direct
- name: cached_io_data
type: bool
default: false
info: |
I/O mode for *data*. One of "direct", "cached" or "directsync". Corresponds
to O_DIRECT, O_SYNC and O_DIRECT|O_SYNC, respectively.
Choose "cached" to use Linux page cache. This may improve read performance
for hot data and slower disks - HDDs and maybe SATA SSDs - but will slightly
decrease write performance for fast disks because page cache is an overhead
itself.
Choose "directsync" to use [immediate_commit](layout-cluster.ru.md#immediate_commit)
(which requires disable_data_fsync) with drives having write-back cache
which can't be turned off, for example, Intel Optane. Also note that *some*
desktop SSDs (for example, HP EX950) may ignore O_SYNC thus making
disable_data_fsync unsafe even with "directsync".
Read and write *data* through Linux page cache, i.e. use a file descriptor
opened with O_SYNC, but without O_DIRECT for I/O. May improve read
performance for hot data and slower disks - HDDs and maybe SATA SSDs.
Not recommended for desktop SSDs without capacitors because O_SYNC flushes
disk cache on every write.
info_ru: |
Режим ввода-вывода для *данных*. Одно из значений "direct", "cached" или
"directsync", означающих O_DIRECT, O_SYNC и O_DIRECT|O_SYNC, соответственно.
Выберите "cached", чтобы использовать системный кэш Linux (page cache) при
чтении и записи. Это может улучшить скорость чтения горячих данных с
относительно медленных дисков - HDD и, возможно, SATA SSD - но немного
снижает производительность записи для быстрых дисков, так как кэш сам по
себе тоже добавляет накладные расходы.
Выберите "directsync", если хотите задействовать
[immediate_commit](layout-cluster.ru.md#immediate_commit) (требующий
включенияd disable_data_fsync) на дисках с неотключаемым кэшем. Пример таких
дисков - Intel Optane. При этом также стоит иметь в виду, что *некоторые*
настольные SSD (например, HP EX950) игнорируют флаг O_SYNC, делая отключение
fsync небезопасным даже с режимом "directsync".
- name: meta_io
type: string
default: direct
Читать и записывать *данные* через системный кэш Linux (page cache), то есть,
использовать для данных файловый дескриптор, открытый без флага O_DIRECT, но
с флагом O_SYNC. Может улучшить скорость чтения для относительно медленных
дисков - HDD и, возможно, SATA SSD. Не рекомендуется для потребительских
SSD без конденсаторов, так как O_SYNC сбрасывает кэш диска при каждой записи.
- name: cached_io_meta
type: bool
default: false
info: |
I/O mode for *metadata*. One of "direct", "cached" or "directsync".
Read and write *metadata* through Linux page cache. May improve read
performance only if your drives are relatively slow (HDD, SATA SSD), and
only if checksums are enabled and [inmemory_metadata](#inmemory_metadata)
is disabled, because in this case metadata blocks are read from disk
on every read request to verify checksums and caching them may reduce this
extra read load.
"cached" may improve read performance, but only under the following conditions:
1. your drives are relatively slow (HDD, SATA SSD), and
2. checksums are enabled, and
3. [inmemory_metadata](#inmemory_metadata) is disabled.
Under all these conditions, metadata blocks are read from disk on every
read request to verify checksums and caching them may reduce this extra
read load. Without (3) metadata is never read from the disk after starting,
and without (2) metadata blocks are read from disk only during journal
Absolutely pointless to enable with enabled inmemory_metadata because all
metadata is kept in memory anyway, and likely pointless without checksums,
because in that case, metadata blocks are read from disk only during journal
flushing.
"directsync" is the same as above.
If the same device is used for data and metadata, meta_io by default is set
to the same value as [data_io](#data_io).
If the same device is used for data and metadata, enabling [cached_io_data](#cached_io_data)
also enables this parameter, given that it isn't turned off explicitly.
info_ru: |
Режим ввода-вывода для *метаданных*. Одно из значений "direct", "cached" или
"directsync".
"cached" может улучшить скорость чтения, если:
1. у вас медленные диски (HDD, SATA SSD)
2. контрольные суммы включены
3. параметр [inmemory_metadata](#inmemory_metadata) отключён.
При этих условиях блоки метаданных читаются с диска при каждом запросе чтения
Читать и записывать *метаданные* через системный кэш Linux. Может улучшить
скорость чтения, если у вас медленные диски, и только если контрольные суммы
включены, а параметр [inmemory_metadata](#inmemory_metadata) отключён, так
как в этом случае блоки метаданных читаются с диска при каждом запросе чтения
для проверки контрольных сумм и их кэширование может снизить дополнительную
нагрузку на диск. Без (3) метаданные никогда не читаются с диска после
запуска OSD, а без (2) блоки метаданных читаются только при сбросе журнала.
нагрузку на диск.
Если одно и то же устройство используется для данных и метаданных, режим
ввода-вывода метаданных по умолчанию устанавливается равным [data_io](#data_io).
- name: journal_io
type: string
default: direct
Абсолютно бессмысленно включать данный параметр, если параметр
inmemory_metadata включён (по умолчанию это так), и также вероятно
бессмысленно включать его, если не включены контрольные суммы, так как в
этом случае блоки метаданных читаются с диска только во время сброса
журнала.
Если одно и то же устройство используется для данных и метаданных, включение
[cached_io_data](#cached_io_data) также включает данный параметр, при
условии, что он не отключён явным образом.
- name: cached_io_journal
type: bool
default: false
info: |
I/O mode for *journal*. One of "direct", "cached" or "directsync".
Read and write *journal* through Linux page cache. May improve read
performance if [inmemory_journal](#inmemory_journal) is turned off.
Here, "cached" may only improve read performance for recent writes and
only if [inmemory_journal](#inmemory_journal) is turned off.
If the same device is used for metadata and journal, journal_io by default
is set to the same value as [meta_io](#meta_io).
If the same device is used for metadata and journal, enabling [cached_io_meta](#cached_io_meta)
also enables this parameter, given that it isn't turned off explicitly.
info_ru: |
Режим ввода-вывода для *журнала*. Одно из значений "direct", "cached" или
"directsync".
Здесь "cached" может улучшить скорость чтения только недавно записанных
данных и только если параметр [inmemory_journal](#inmemory_journal)
Читать и записывать *журнал* через системный кэш Linux. Может улучшить
скорость чтения, если параметр [inmemory_journal](#inmemory_journal)
отключён.
Если одно и то же устройство используется для метаданных и журнала,
режим ввода-вывода журнала по умолчанию устанавливается равным
[meta_io](#meta_io).
включение [cached_io_meta](#cached_io_meta) также включает данный
параметр, при условии, что он не отключён явным образом.
- name: journal_sector_buffer_count
type: int
default: 32

View File

@@ -78,15 +78,9 @@ const etcd_tree = {
disk_alignment: 4096,
bitmap_granularity: 4096,
immediate_commit: false, // 'all' or 'small'
// client - configurable online
client_max_dirty_bytes: 33554432,
client_max_dirty_ops: 1024,
client_enable_writeback: false,
client_max_buffered_bytes: 33554432,
client_max_buffered_ops: 1024,
client_max_writeback_iodepth: 256,
// client and osd - configurable online
log_level: 0,
client_dirty_limit: 33554432,
peer_connect_interval: 5, // seconds. min: 1
peer_connect_timeout: 5, // seconds. min: 1
osd_idle_timeout: 5, // seconds. min: 1
@@ -1162,33 +1156,6 @@ class Mon
}
}
filter_osds_by_block_layout(flat_tree, block_size, bitmap_granularity, immediate_commit)
{
for (const host in flat_tree)
{
let found = 0;
for (const osd in flat_tree[host])
{
const osd_stat = this.state.osd.stats[osd];
if (osd_stat && (osd_stat.bs_block_size && osd_stat.bs_block_size != block_size ||
osd_stat.bitmap_granularity && osd_stat.bitmap_granularity != bitmap_granularity ||
osd_stat.immediate_commit == 'small' && immediate_commit == 'all' ||
osd_stat.immediate_commit == 'none' && immediate_commit != 'none'))
{
delete flat_tree[host][osd];
}
else
{
found++;
}
}
if (!found)
{
delete flat_tree[host];
}
}
}
get_affinity_osds(pool_cfg, up_osds, osd_tree)
{
let aff_osds = up_osds;
@@ -1249,12 +1216,6 @@ class Mon
pool_tree = pool_tree ? pool_tree.children : [];
pool_tree = LPOptimizer.flatten_tree(pool_tree, levels, pool_cfg.failure_domain, 'osd');
this.filter_osds_by_tags(osd_tree, pool_tree, pool_cfg.osd_tags);
this.filter_osds_by_block_layout(
pool_tree,
pool_cfg.block_size || this.config.block_size || 131072,
pool_cfg.bitmap_granularity || this.config.bitmap_granularity || 4096,
pool_cfg.immediate_commit || this.config.immediate_commit || 'none'
);
// These are for the purpose of building history.osd_sets
const real_prev_pgs = [];
let pg_history = [];

View File

@@ -137,7 +137,6 @@ endif (${WITH_FIO})
add_library(vitastor_client SHARED
cluster_client.cpp
cluster_client_list.cpp
cluster_client_wb.cpp
vitastor_c.cpp
cli_common.cpp
cli_alloc_osd.cpp
@@ -301,7 +300,7 @@ target_link_libraries(test_crc32
add_executable(test_cluster_client
EXCLUDE_FROM_ALL
test_cluster_client.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp cluster_client_wb.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
etcd_state_client.cpp timerfd_manager.cpp str_util.cpp ../json11/json11.cpp
)
target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__)

View File

@@ -82,8 +82,3 @@ uint32_t blockstore_t::get_bitmap_granularity()
{
return impl->get_bitmap_granularity();
}
bool blockstore_t::wants_fsync()
{
return impl->wants_fsync();
}

View File

@@ -226,7 +226,4 @@ public:
uint64_t get_journal_size();
uint32_t get_bitmap_granularity();
// Returns true if writing can stall due to a lack of fsync
bool wants_fsync();
};

View File

@@ -45,31 +45,13 @@ void blockstore_disk_t::parse_config(std::map<std::string, std::string> & config
meta_block_size = parse_size(config["meta_block_size"]);
bitmap_granularity = parse_size(config["bitmap_granularity"]);
meta_format = stoull_full(config["meta_format"]);
if (config.find("data_io") == config.end() &&
config.find("meta_io") == config.end() &&
config.find("journal_io") == config.end())
{
bool cached_io_data = config["cached_io_data"] == "true" || config["cached_io_data"] == "yes" || config["cached_io_data"] == "1";
bool cached_io_meta = cached_io_data && (meta_device == data_device || meta_device == "") &&
config.find("cached_io_meta") == config.end() ||
config["cached_io_meta"] == "true" || config["cached_io_meta"] == "yes" || config["cached_io_meta"] == "1";
bool cached_io_journal = cached_io_meta && (journal_device == meta_device || journal_device == "") &&
config.find("cached_io_journal") == config.end() ||
config["cached_io_journal"] == "true" || config["cached_io_journal"] == "yes" || config["cached_io_journal"] == "1";
data_io = cached_io_data ? "cached" : "direct";
meta_io = cached_io_meta ? "cached" : "direct";
journal_io = cached_io_journal ? "cached" : "direct";
}
else
{
data_io = config.find("data_io") != config.end() ? config["data_io"] : "direct";
meta_io = config.find("meta_io") != config.end()
? config["meta_io"]
: (meta_device == data_device || meta_device == "" ? data_io : "direct");
journal_io = config.find("journal_io") != config.end()
? config["journal_io"]
: (journal_device == meta_device || journal_device == "" ? meta_io : "direct");
}
cached_io_data = config["cached_io_data"] == "true" || config["cached_io_data"] == "yes" || config["cached_io_data"] == "1";
cached_io_meta = cached_io_data && (meta_device == data_device || meta_device == "") &&
config.find("cached_io_meta") == config.end() ||
config["cached_io_meta"] == "true" || config["cached_io_meta"] == "yes" || config["cached_io_meta"] == "1";
cached_io_journal = cached_io_meta && (journal_device == meta_device || journal_device == "") &&
config.find("cached_io_journal") == config.end() ||
config["cached_io_journal"] == "true" || config["cached_io_journal"] == "yes" || config["cached_io_journal"] == "1";
if (config["data_csum_type"] == "crc32c")
{
data_csum_type = BLOCKSTORE_CSUM_CRC32C;
@@ -290,19 +272,9 @@ static void check_size(int fd, uint64_t *size, uint64_t *sectsize, std::string n
}
}
static int bs_openmode(const std::string & mode)
{
if (mode == "directsync")
return O_DIRECT|O_SYNC;
else if (mode == "cached")
return O_SYNC;
else
return O_DIRECT;
}
void blockstore_disk_t::open_data()
{
data_fd = open(data_device.c_str(), bs_openmode(data_io) | O_RDWR);
data_fd = open(data_device.c_str(), (cached_io_data ? O_SYNC : O_DIRECT) | O_RDWR);
if (data_fd == -1)
{
throw std::runtime_error("Failed to open data device "+data_device+": "+std::string(strerror(errno)));
@@ -327,9 +299,9 @@ void blockstore_disk_t::open_data()
void blockstore_disk_t::open_meta()
{
if (meta_device != data_device || meta_io != data_io)
if (meta_device != data_device || cached_io_meta != cached_io_data)
{
meta_fd = open(meta_device.c_str(), bs_openmode(meta_io) | O_RDWR);
meta_fd = open(meta_device.c_str(), (cached_io_meta ? O_SYNC : O_DIRECT) | O_RDWR);
if (meta_fd == -1)
{
throw std::runtime_error("Failed to open metadata device "+meta_device+": "+std::string(strerror(errno)));
@@ -365,9 +337,9 @@ void blockstore_disk_t::open_meta()
void blockstore_disk_t::open_journal()
{
if (journal_device != meta_device || journal_io != meta_io)
if (journal_device != meta_device || cached_io_journal != cached_io_meta)
{
journal_fd = open(journal_device.c_str(), bs_openmode(journal_io) | O_RDWR);
journal_fd = open(journal_device.c_str(), (cached_io_journal ? O_SYNC : O_DIRECT) | O_RDWR);
if (journal_fd == -1)
{
throw std::runtime_error("Failed to open journal device "+journal_device+": "+std::string(strerror(errno)));

View File

@@ -31,9 +31,8 @@ struct blockstore_disk_t
uint32_t csum_block_size = 4096;
// By default, Blockstore locks all opened devices exclusively. This option can be used to disable locking
bool disable_flock = false;
// I/O modes for data, metadata and journal: direct or "" = O_DIRECT, cached = O_SYNC, directsync = O_DIRECT|O_SYNC
// O_SYNC without O_DIRECT = use Linux page cache for reads and writes
std::string data_io, meta_io, journal_io;
// Use Linux page cache for reads and writes, i.e. open FDs with O_SYNC instead of O_DIRECT
bool cached_io_data = false, cached_io_meta = false, cached_io_journal = false;
int meta_fd = -1, data_fd = -1, journal_fd = -1;
uint64_t meta_offset, meta_device_sect, meta_device_size, meta_len, meta_format = 0;

View File

@@ -167,7 +167,7 @@ void blockstore_impl_t::loop()
// wait for all big writes to complete, submit data device fsync
// wait for the data device fsync to complete, then submit journal writes for big writes
// then submit an fsync operation
if (0 && has_writes)
if (has_writes)
{
// Can't submit SYNC before previous writes
continue;
@@ -384,10 +384,6 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
ringloop->set_immediate([op]() { std::function<void (blockstore_op_t*)>(op->callback)(op); });
return;
}
if (op->opcode == BS_OP_SYNC)
{
unsynced_queued_ops = 0;
}
init_op(op);
submit_queue.push_back(op);
ringloop->wakeup();
@@ -734,15 +730,3 @@ void blockstore_impl_t::disk_error_abort(const char *op, int retval, int expecte
fprintf(stderr, "Disk %s failed: result is %d, expected %d. Can't continue, sorry :-(\n", op, retval, expected);
exit(1);
}
bool blockstore_impl_t::wants_fsync()
{
if (!unstable_writes.size())
{
return false;
}
uint64_t journal_free_space = journal.next_free < journal.used_start
? (journal.used_start - journal.next_free)
: (journal.len - journal.next_free + journal.used_start - journal.block_size);
return journal_fsync_feedback_limit > 0 && journal.len-journal_free_space >= journal_fsync_feedback_limit;
}

View File

@@ -262,10 +262,6 @@ class blockstore_impl_t
int throttle_target_parallelism = 1;
// Minimum difference in microseconds between target and real execution times to throttle the response
int throttle_threshold_us = 50;
// Maximum writes between automatically added fsync operations
uint64_t autosync_writes = 128;
// Maximum free space in the journal in bytes to start sending fsync feedback to primary OSDs
uint64_t journal_fsync_feedback_limit = 0;
/******* END OF OPTIONS *******/
struct ring_consumer_t ring_consumer;
@@ -277,7 +273,6 @@ class blockstore_impl_t
std::vector<blockstore_op_t*> submit_queue;
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
int unsynced_big_write_count = 0;
int unsynced_queued_ops = 0;
allocator *data_alloc = NULL;
uint8_t *zero_object;
@@ -435,6 +430,4 @@ public:
inline uint64_t get_free_block_count() { return data_alloc->get_free_count(); }
inline uint32_t get_bitmap_granularity() { return dsk.disk_alignment; }
inline uint64_t get_journal_size() { return dsk.journal_len; }
bool wants_fsync();
};

View File

@@ -4,25 +4,6 @@
#include <sys/file.h>
#include "blockstore_impl.h"
static uint64_t parse_fsync_feedback(blockstore_config_t & config, uint64_t journal_len)
{
uint64_t journal_fsync_feedback_limit = 0;
if (config.find("journal_min_free_bytes") == config.end() &&
config.find("journal_min_free_percent") == config.end())
{
journal_fsync_feedback_limit = 90 * journal_len / 100;
}
else
{
journal_fsync_feedback_limit = strtoull(config["journal_min_free_bytes"].c_str(), NULL, 10);
if (!journal_fsync_feedback_limit)
{
journal_fsync_feedback_limit = strtoull(config["journal_min_free_percent"].c_str(), NULL, 10) * journal_len / 100;
}
}
return journal_fsync_feedback_limit;
}
void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
{
// Online-configurable options:
@@ -38,10 +19,6 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
throttle_target_mbs = strtoull(config["throttle_target_mbs"].c_str(), NULL, 10);
throttle_target_parallelism = strtoull(config["throttle_target_parallelism"].c_str(), NULL, 10);
throttle_threshold_us = strtoull(config["throttle_threshold_us"].c_str(), NULL, 10);
if (config.find("autosync_writes") != config.end())
{
autosync_writes = strtoull(config["autosync_writes"].c_str(), NULL, 10);
}
if (!max_flusher_count)
{
max_flusher_count = 256;
@@ -72,8 +49,6 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
}
if (!init)
{
// has to be parsed after dsk.parse_config(), thus repeated here for online update
journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len);
return;
}
// Offline-configurable options:
@@ -117,7 +92,6 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config, bool init)
config["journal_no_same_sector_overwrites"] == "1" || config["journal_no_same_sector_overwrites"] == "yes";
journal.inmemory = config["inmemory_journal"] != "false" && config["inmemory_journal"] != "0" &&
config["inmemory_journal"] != "no";
journal_fsync_feedback_limit = parse_fsync_feedback(config, journal.len);
// Validate
if (journal.sector_count < 2)
{

View File

@@ -16,6 +16,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
{
if (immediate_commit == IMMEDIATE_ALL)
{
// We can return immediately because sync is only dequeued after all previous writes
op->retval = 0;
FINISH_OP(op);
return 2;

View File

@@ -127,9 +127,8 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
return false;
}
}
bool imm = (op->len < dsk.data_block_size ? (immediate_commit != IMMEDIATE_NONE) : (immediate_commit == IMMEDIATE_ALL));
if (wait_big && !is_del && !deleted && op->len < dsk.data_block_size && !imm ||
!imm && unsynced_queued_ops >= autosync_writes)
if (wait_big && !is_del && !deleted && op->len < dsk.data_block_size &&
immediate_commit != IMMEDIATE_ALL)
{
// Issue an additional sync so that the previous big write can reach the journal
blockstore_op_t *sync_op = new blockstore_op_t;
@@ -140,8 +139,6 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
};
enqueue_op(sync_op);
}
else if (!imm)
unsynced_queued_ops++;
#ifdef BLOCKSTORE_DEBUG
if (is_del)
printf("Delete %lx:%lx v%lu\n", op->oid.inode, op->oid.stripe, op->version);
@@ -289,13 +286,18 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
printf("Restoring %lx:%lx version: v%lu -> v%lu\n", op->oid.inode, op->oid.stripe, op->version, PRIV(op)->real_version);
#endif
auto prev_it = dirty_it;
prev_it--;
if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version)
if (prev_it != dirty_db.begin())
{
// Original version is still invalid
// All subsequent writes to the same object must be canceled too
cancel_all_writes(op, dirty_it, -EEXIST);
return 2;
prev_it--;
if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version)
{
// Original version is still invalid
// All subsequent writes to the same object must be canceled too
printf("Tried to write %lx:%lx v%lu after delete (old version v%lu), but already have v%lu\n",
op->oid.inode, op->oid.stripe, PRIV(op)->real_version, op->version, prev_it->first.version);
cancel_all_writes(op, dirty_it, -EEXIST);
return 2;
}
}
op->version = PRIV(op)->real_version;
PRIV(op)->real_version = 0;

View File

@@ -349,7 +349,6 @@ static int run(cli_tool_t *p, json11::Json::object cfg)
p->ringloop->wait();
}
// Destroy the client
p->cli->flush();
delete p->cli;
delete p->epmgr;
delete p->ringloop;

View File

@@ -174,7 +174,7 @@ resume_1:
{ "size", 0 },
{ "readonly", false },
{ "pool_id", (uint64_t)INODE_POOL(inode_num) },
{ "pool_name", pool_it == parent->cli->st_cli.pool_config.end()
{ "pool_name", pool_it != parent->cli->st_cli.pool_config.end()
? (pool_it->second.name == "" ? "<Unnamed>" : pool_it->second.name) : "?" },
{ "inode_num", INODE_NO_POOL(inode_num) },
{ "inode_id", inode_num },

View File

@@ -53,7 +53,6 @@ struct snap_merger_t
std::map<inode_t, std::vector<uint64_t>> layer_lists;
std::map<inode_t, uint64_t> layer_block_size;
std::map<inode_t, uint64_t> layer_list_pos;
std::vector<snap_rw_op_t*> continue_rwo, continue_rwo2;
int in_flight = 0;
uint64_t last_fsync_offset = 0;
uint64_t last_written_offset = 0;
@@ -305,12 +304,6 @@ struct snap_merger_t
oit = merge_offsets.begin();
resume_5:
// Now read, overwrite and optionally delete offsets one by one
continue_rwo2.swap(continue_rwo);
for (auto rwo: continue_rwo2)
{
next_write(rwo);
}
continue_rwo2.clear();
while (in_flight < parent->iodepth*parent->parallel_osds &&
oit != merge_offsets.end() && !rwo_error.size())
{
@@ -471,8 +464,7 @@ struct snap_merger_t
rwo->error_offset = op->offset;
rwo->error_read = true;
}
continue_rwo.push_back(rwo);
parent->ringloop->wakeup();
next_write(rwo);
};
parent->cli->execute(op);
}
@@ -552,9 +544,11 @@ struct snap_merger_t
}
// Increment CAS version
rwo->op.version = subop->version;
if (use_cas)
next_write(rwo);
else
autofree_op(rwo);
delete subop;
continue_rwo.push_back(rwo);
parent->ringloop->wakeup();
};
parent->cli->execute(subop);
}

View File

@@ -3,13 +3,21 @@
#include <stdexcept>
#include <assert.h>
#include "cluster_client_impl.h"
#include "http_client.h" // json_is_true
#include "cluster_client.h"
#define SCRAP_BUFFER_SIZE 4*1024*1024
#define PART_SENT 1
#define PART_DONE 2
#define PART_ERROR 4
#define PART_RETRY 8
#define CACHE_DIRTY 1
#define CACHE_FLUSHING 2
#define CACHE_REPEATING 3
#define OP_FLUSH_BUFFER 0x02
#define OP_IMMEDIATE_COMMIT 0x04
cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config)
{
wb = new writeback_cache_t();
cli_config = config.object_items();
file_config = osd_messenger_t::read_config(config);
config = osd_messenger_t::merge_configs(cli_config, file_config, etcd_global_config, {});
@@ -29,14 +37,20 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
continue_lists();
continue_raw_ops(peer_osd);
}
else
else if (dirty_buffers.size())
{
// peer_osd just dropped connection
// determine WHICH dirty_buffers are now obsolete and repeat them
if (wb->repeat_ops_for(this, peer_osd) > 0)
for (auto & wr: dirty_buffers)
{
continue_ops();
if (affects_osd(wr.first.inode, wr.first.stripe, wr.second.len, peer_osd) &&
wr.second.state != CACHE_REPEATING)
{
// FIXME: Flush in larger parts
flush_buffer(wr.first, &wr.second);
}
}
continue_ops();
}
};
msgr.exec_op = [this](osd_op_t *op)
@@ -64,14 +78,16 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
cluster_client_t::~cluster_client_t()
{
msgr.repeer_pgs = [this](osd_num_t){};
for (auto bp: dirty_buffers)
{
free(bp.second.buf);
}
dirty_buffers.clear();
if (ringloop)
{
ringloop->unregister_consumer(&consumer);
}
free(scrap_buffer);
delete wb;
wb = NULL;
}
cluster_op_t::~cluster_op_t()
@@ -120,19 +136,6 @@ void cluster_client_t::init_msgr()
}
}
void cluster_client_t::unshift_op(cluster_op_t *op)
{
op->next = op_queue_head;
if (op_queue_head)
{
op_queue_head->prev = op;
op_queue_head = op;
}
else
op_queue_tail = op_queue_head = op;
inc_wait(op->opcode, op->flags, op->next, 1);
}
void cluster_client_t::calc_wait(cluster_op_t *op)
{
op->prev_wait = 0;
@@ -153,7 +156,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op)
{
for (auto prev = op->prev; prev; prev = prev->prev)
{
if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE && (!(prev->flags & OP_IMMEDIATE_COMMIT) || enable_writeback))
if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE && !(prev->flags & OP_IMMEDIATE_COMMIT))
{
op->prev_wait++;
}
@@ -163,7 +166,21 @@ void cluster_client_t::calc_wait(cluster_op_t *op)
}
else /* if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP) */
{
continue_rw(op);
for (auto prev = op_queue_head; prev && prev != op; prev = prev->next)
{
if (prev->opcode == OSD_OP_WRITE && (prev->flags & OP_FLUSH_BUFFER))
{
op->prev_wait++;
}
else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ ||
prev->opcode == OSD_OP_READ_BITMAP || prev->opcode == OSD_OP_READ_CHAIN_BITMAP)
{
// Flushes are always in the beginning (we're scanning from the beginning of the queue)
break;
}
}
if (!op->prev_wait)
continue_rw(op);
}
}
@@ -174,8 +191,10 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n
while (next)
{
auto n2 = next->next;
if (next->opcode == OSD_OP_SYNC && (!(flags & OP_IMMEDIATE_COMMIT) || enable_writeback) ||
next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER))
if (next->opcode == OSD_OP_SYNC && !(flags & OP_IMMEDIATE_COMMIT) ||
next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER) ||
(next->opcode == OSD_OP_READ || next->opcode == OSD_OP_READ_BITMAP ||
next->opcode == OSD_OP_READ_CHAIN_BITMAP) && (flags & OP_FLUSH_BUFFER))
{
next->prev_wait += inc;
assert(next->prev_wait >= 0);
@@ -226,37 +245,13 @@ void cluster_client_t::erase_op(cluster_op_t *op)
op_queue_tail = op->prev;
op->next = op->prev = NULL;
if (flags & OP_FLUSH_BUFFER)
{
// Completed flushes change writeback buffer states,
// so the callback should be run before inc_wait()
// which may continue following SYNCs, but these SYNCs
// should know about the changed buffer state
// This is ugly but this is the way we do it
std::function<void(cluster_op_t*)>(op->callback)(op);
}
if (!(flags & OP_IMMEDIATE_COMMIT) || enable_writeback)
{
if (!(flags & OP_IMMEDIATE_COMMIT))
inc_wait(opcode, flags, next, -1);
}
// Call callback at the end to avoid inconsistencies in prev_wait
// if the callback adds more operations itself
if (!(flags & OP_FLUSH_BUFFER))
{
// Call callback at the end to avoid inconsistencies in prev_wait
// if the callback adds more operations itself
std::function<void(cluster_op_t*)>(op->callback)(op);
}
if (flags & OP_FLUSH_BUFFER)
{
int i = 0;
while (i < wb->writeback_overflow.size() && wb->writebacks_active < client_max_writeback_iodepth)
{
execute_internal(wb->writeback_overflow[i]);
i++;
}
if (i > 0)
{
wb->writeback_overflow.erase(wb->writeback_overflow.begin(), wb->writeback_overflow.begin()+i);
}
}
}
void cluster_client_t::continue_ops(bool up_retry)
@@ -300,7 +295,6 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & etcd_global_co
{
this->etcd_global_config = etcd_global_config;
config = osd_messenger_t::merge_configs(cli_config, file_config, etcd_global_config, {});
// client_max_dirty_bytes/client_dirty_limit
if (config.find("client_max_dirty_bytes") != config.end())
{
client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value();
@@ -316,34 +310,11 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & etcd_global_co
{
client_max_dirty_bytes = DEFAULT_CLIENT_MAX_DIRTY_BYTES;
}
// client_max_dirty_ops
client_max_dirty_ops = config["client_max_dirty_ops"].uint64_value();
if (!client_max_dirty_ops)
{
client_max_dirty_ops = DEFAULT_CLIENT_MAX_DIRTY_OPS;
}
// client_enable_writeback
enable_writeback = json_is_true(config["client_enable_writeback"]) &&
json_is_true(config["client_writeback_allowed"]);
// client_max_buffered_bytes
client_max_buffered_bytes = config["client_max_buffered_bytes"].uint64_value();
if (!client_max_buffered_bytes)
{
client_max_buffered_bytes = DEFAULT_CLIENT_MAX_BUFFERED_BYTES;
}
// client_max_buffered_ops
client_max_buffered_ops = config["client_max_buffered_ops"].uint64_value();
if (!client_max_buffered_ops)
{
client_max_buffered_ops = DEFAULT_CLIENT_MAX_BUFFERED_OPS;
}
// client_max_writeback_iodepth
client_max_writeback_iodepth = config["client_max_writeback_iodepth"].uint64_value();
if (!client_max_writeback_iodepth)
{
client_max_writeback_iodepth = DEFAULT_CLIENT_MAX_WRITEBACK_IODEPTH;
}
// up_wait_retry_interval
up_wait_retry_interval = config["up_wait_retry_interval"].uint64_value();
if (!up_wait_retry_interval)
{
@@ -403,8 +374,6 @@ void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes
bool cluster_client_t::get_immediate_commit(uint64_t inode)
{
if (enable_writeback)
return false;
pool_id_t pool_id = INODE_POOL(inode);
if (!pool_id)
return true;
@@ -439,41 +408,6 @@ void cluster_client_t::on_ready(std::function<void(void)> fn)
}
}
bool cluster_client_t::flush()
{
if (!ringloop)
{
if (wb->writeback_queue.size())
{
wb->start_writebacks(this, 0);
cluster_op_t *sync = new cluster_op_t;
sync->opcode = OSD_OP_SYNC;
sync->callback = [this](cluster_op_t *sync)
{
delete sync;
};
execute(sync);
}
return op_queue_head == NULL;
}
bool sync_done = false;
cluster_op_t *sync = new cluster_op_t;
sync->opcode = OSD_OP_SYNC;
sync->callback = [this, &sync_done](cluster_op_t *sync)
{
delete sync;
sync_done = true;
};
execute(sync);
while (!sync_done)
{
ringloop->loop();
if (!sync_done)
ringloop->wait();
}
return true;
}
/**
* How writes are synced when immediate_commit is false
*
@@ -494,9 +428,6 @@ bool cluster_client_t::flush()
* 3) if yes, send all SYNCs. otherwise, leave current SYNC as is.
* 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes
* 5) if any of them fail due to other errors, fail the SYNC operation
*
* If writeback caching is turned on and writeback limit is not exhausted:
* data is just copied and the write is confirmed to the client.
*/
void cluster_client_t::execute(cluster_op_t *op)
{
@@ -512,73 +443,67 @@ void cluster_client_t::execute(cluster_op_t *op)
offline_ops.push_back(op);
return;
}
op->flags = op->flags & OSD_OP_IGNORE_READONLY; // the only allowed flag
execute_internal(op);
}
void cluster_client_t::execute_internal(cluster_op_t *op)
{
op->cur_inode = op->inode;
op->retval = 0;
// check alignment, readonly flag and so on
if (!check_rw(op))
op->flags = op->flags & OSD_OP_IGNORE_READONLY; // single allowed flag
if (op->opcode != OSD_OP_SYNC)
{
return;
}
if (op->opcode == OSD_OP_WRITE && enable_writeback && !(op->flags & OP_FLUSH_BUFFER) &&
!op->version /* FIXME no CAS writeback */)
{
if (wb->writebacks_active >= client_max_writeback_iodepth)
pool_id_t pool_id = INODE_POOL(op->cur_inode);
if (!pool_id)
{
// Writeback queue is full, postpone the operation
wb->writeback_overflow.push_back(op);
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return;
}
// Just copy and acknowledge the operation
wb->copy_write(op, CACHE_DIRTY);
while (wb->writeback_bytes + op->len > client_max_buffered_bytes || wb->writeback_queue_size > client_max_buffered_ops)
auto pool_it = st_cli.pool_config.find(pool_id);
if (pool_it == st_cli.pool_config.end() || pool_it->second.real_pg_count == 0)
{
// Initiate some writeback (asynchronously)
wb->start_writebacks(this, 1);
// Pools are loaded, but this one is unknown
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return;
}
// Check alignment
if (!op->len && (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP || op->opcode == OSD_OP_WRITE) ||
op->offset % pool_it->second.bitmap_granularity || op->len % pool_it->second.bitmap_granularity)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return;
}
if (pool_it->second.immediate_commit == IMMEDIATE_ALL)
{
op->flags |= OP_IMMEDIATE_COMMIT;
}
op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op);
return;
}
if (op->opcode == OSD_OP_WRITE && !(op->flags & OP_IMMEDIATE_COMMIT))
{
if (!(op->flags & OP_FLUSH_BUFFER))
{
wb->copy_write(op, CACHE_WRITTEN);
}
if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops)
{
// Push an extra SYNC operation to flush previous writes
cluster_op_t *sync_op = new cluster_op_t;
sync_op->opcode = OSD_OP_SYNC;
sync_op->flags = OP_FLUSH_BUFFER;
sync_op->callback = [](cluster_op_t* sync_op)
{
delete sync_op;
};
execute_internal(sync_op);
sync_op->prev = op_queue_tail;
if (op_queue_tail)
{
op_queue_tail->next = sync_op;
op_queue_tail = sync_op;
}
else
op_queue_tail = op_queue_head = sync_op;
dirty_bytes = 0;
dirty_ops = 0;
calc_wait(sync_op);
}
dirty_bytes += op->len;
dirty_ops++;
}
else if (op->opcode == OSD_OP_SYNC)
{
// Flush the whole write-back queue first
if (!(op->flags & OP_FLUSH_BUFFER) && wb->writeback_overflow.size() > 0)
{
// Writeback queue is full, postpone the operation
wb->writeback_overflow.push_back(op);
return;
}
if (wb->writeback_queue.size())
{
wb->start_writebacks(this, 0);
}
dirty_bytes = 0;
dirty_ops = 0;
}
@@ -590,7 +515,7 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
}
else
op_queue_tail = op_queue_head = op;
if (!(op->flags & OP_IMMEDIATE_COMMIT) || enable_writeback)
if (!(op->flags & OP_IMMEDIATE_COMMIT))
calc_wait(op);
else
{
@@ -601,52 +526,6 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
}
}
bool cluster_client_t::check_rw(cluster_op_t *op)
{
if (op->opcode == OSD_OP_SYNC)
{
return true;
}
pool_id_t pool_id = INODE_POOL(op->cur_inode);
if (!pool_id)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return false;
}
auto pool_it = st_cli.pool_config.find(pool_id);
if (pool_it == st_cli.pool_config.end() || pool_it->second.real_pg_count == 0)
{
// Pools are loaded, but this one is unknown
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return false;
}
// Check alignment
if (!op->len && (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP || op->opcode == OSD_OP_WRITE) ||
op->offset % pool_it->second.bitmap_granularity || op->len % pool_it->second.bitmap_granularity)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
return false;
}
if (pool_it->second.immediate_commit == IMMEDIATE_ALL)
{
op->flags |= OP_IMMEDIATE_COMMIT;
}
if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_DELETE) && !(op->flags & OSD_OP_IGNORE_READONLY))
{
auto ino_it = st_cli.inode_config.find(op->inode);
if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly)
{
op->retval = -EROFS;
std::function<void(cluster_op_t*)>(op->callback)(op);
return false;
}
}
return true;
}
void cluster_client_t::execute_raw(osd_num_t osd_num, osd_op_t *op)
{
auto fd_it = msgr.osd_peer_fds.find(osd_num);
@@ -664,6 +543,114 @@ void cluster_client_t::execute_raw(osd_num_t osd_num, osd_op_t *op)
}
}
void cluster_client_t::copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers)
{
// Save operation for replay when one of PGs goes out of sync
// (primary OSD drops our connection in this case)
auto dirty_it = dirty_buffers.lower_bound((object_id){
.inode = op->inode,
.stripe = op->offset,
});
while (dirty_it != dirty_buffers.begin())
{
dirty_it--;
if (dirty_it->first.inode != op->inode ||
(dirty_it->first.stripe + dirty_it->second.len) <= op->offset)
{
dirty_it++;
break;
}
}
uint64_t pos = op->offset, len = op->len, iov_idx = 0, iov_pos = 0;
while (len > 0)
{
uint64_t new_len = 0;
if (dirty_it == dirty_buffers.end() || dirty_it->first.inode != op->inode)
{
new_len = len;
}
else if (dirty_it->first.stripe > pos)
{
new_len = dirty_it->first.stripe - pos;
if (new_len > len)
{
new_len = len;
}
}
if (new_len > 0)
{
dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
.inode = op->inode,
.stripe = pos,
}, (cluster_buffer_t){
.buf = malloc_or_die(new_len),
.len = new_len,
});
}
// FIXME: Split big buffers into smaller ones on overwrites. But this will require refcounting
dirty_it->second.state = CACHE_DIRTY;
uint64_t cur_len = (dirty_it->first.stripe + dirty_it->second.len - pos);
if (cur_len > len)
{
cur_len = len;
}
while (cur_len > 0 && iov_idx < op->iov.count)
{
unsigned iov_len = (op->iov.buf[iov_idx].iov_len - iov_pos);
if (iov_len <= cur_len)
{
memcpy((uint8_t*)dirty_it->second.buf + pos - dirty_it->first.stripe,
(uint8_t*)op->iov.buf[iov_idx].iov_base + iov_pos, iov_len);
pos += iov_len;
len -= iov_len;
cur_len -= iov_len;
iov_pos = 0;
iov_idx++;
}
else
{
memcpy((uint8_t*)dirty_it->second.buf + pos - dirty_it->first.stripe,
(uint8_t*)op->iov.buf[iov_idx].iov_base + iov_pos, cur_len);
pos += cur_len;
len -= cur_len;
iov_pos += cur_len;
cur_len = 0;
}
}
dirty_it++;
}
}
void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr)
{
wr->state = CACHE_REPEATING;
cluster_op_t *op = new cluster_op_t;
op->flags = OSD_OP_IGNORE_READONLY|OP_FLUSH_BUFFER;
op->opcode = OSD_OP_WRITE;
op->cur_inode = op->inode = oid.inode;
op->offset = oid.stripe;
op->len = wr->len;
op->iov.push_back(wr->buf, wr->len);
op->callback = [wr](cluster_op_t* op)
{
if (wr->state == CACHE_REPEATING)
{
wr->state = CACHE_DIRTY;
}
delete op;
};
op->next = op_queue_head;
if (op_queue_head)
{
op_queue_head->prev = op;
op_queue_head = op;
}
else
op_queue_tail = op_queue_head = op;
inc_wait(op->opcode, op->flags, op->next, 1);
continue_rw(op);
}
int cluster_client_t::continue_rw(cluster_op_t *op)
{
if (op->state == 0)
@@ -672,7 +659,27 @@ int cluster_client_t::continue_rw(cluster_op_t *op)
goto resume_1;
else if (op->state == 2)
goto resume_2;
else if (op->state == 3)
goto resume_3;
resume_0:
if (op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_DELETE)
{
if (!(op->flags & OSD_OP_IGNORE_READONLY))
{
auto ino_it = st_cli.inode_config.find(op->inode);
if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly)
{
op->retval = -EINVAL;
erase_op(op);
return 1;
}
}
if (op->opcode == OSD_OP_WRITE && !(op->flags & OP_IMMEDIATE_COMMIT) && !(op->flags & OP_FLUSH_BUFFER))
{
copy_write(op, dirty_buffers);
}
}
resume_1:
// Slice the operation into parts
slice_rw(op);
op->needs_reslice = false;
@@ -683,9 +690,9 @@ resume_0:
erase_op(op);
return 1;
}
resume_1:
resume_2:
// Send unsent parts, if they're not subject to change
op->state = 2;
op->state = 3;
if (op->needs_reslice)
{
for (int i = 0; i < op->parts.size(); i++)
@@ -695,7 +702,7 @@ resume_1:
op->retval = -EPIPE;
}
}
goto resume_2;
goto resume_3;
}
for (int i = 0; i < op->parts.size(); i++)
{
@@ -716,18 +723,18 @@ resume_1:
});
}
}
op->state = 1;
op->state = 2;
}
}
}
if (op->state == 1)
if (op->state == 2)
{
return 0;
}
resume_2:
resume_3:
if (op->inflight_count > 0)
{
op->state = 2;
op->state = 3;
return 0;
}
if (op->done_count >= op->parts.size())
@@ -755,7 +762,7 @@ resume_2:
op->cur_inode = ino_it->second.parent_id;
op->parts.clear();
op->done_count = 0;
goto resume_0;
goto resume_1;
}
}
op->retval = op->len;
@@ -767,8 +774,7 @@ resume_2:
erase_op(op);
return 1;
}
else if (op->retval != 0 && !(op->flags & OP_FLUSH_BUFFER) &&
op->retval != -EPIPE && op->retval != -EIO && op->retval != -ENOSPC)
else if (op->retval != 0 && op->retval != -EPIPE && op->retval != -EIO && op->retval != -ENOSPC)
{
// Fatal error (neither -EPIPE, -EIO nor -ENOSPC)
// FIXME: Add a parameter to allow to not wait for EIOs (incomplete or corrupted objects) to heal
@@ -783,7 +789,7 @@ resume_2:
{
op->parts.clear();
op->done_count = 0;
goto resume_0;
goto resume_1;
}
else
{
@@ -794,7 +800,7 @@ resume_2:
op->parts[i].flags = PART_RETRY;
}
}
goto resume_1;
goto resume_2;
}
}
return 0;
@@ -868,11 +874,6 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
int iov_idx = 0;
size_t iov_pos = 0;
int i = 0;
// We also have to return reads from CACHE_REPEATING buffers - they are not
// guaranteed to be present on target OSDs at the moment of repeating
// And we're also free to return data from other cached buffers just
// because it's faster
bool dirty_copied = wb->read_from_cache(op, pool_cfg.bitmap_granularity);
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
{
pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
@@ -881,7 +882,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
? (stripe + pg_block_size) : (op->offset + op->len);
op->parts[i].iov.reset();
op->parts[i].flags = 0;
if (op->cur_inode != op->inode || op->opcode == OSD_OP_READ && dirty_copied)
if (op->cur_inode != op->inode)
{
// Read remaining parts from upper layers
uint64_t prev = begin, cur = begin;
@@ -1044,7 +1045,13 @@ int cluster_client_t::continue_sync(cluster_op_t *op)
do_it++;
}
// Post sync to affected OSDs
wb->fsync_start();
for (auto & prev_op: dirty_buffers)
{
if (prev_op.second.state == CACHE_DIRTY)
{
prev_op.second.state = CACHE_FLUSHING;
}
}
op->parts.resize(dirty_osds.size());
op->retval = 0;
{
@@ -1069,7 +1076,13 @@ resume_1:
}
if (op->retval != 0)
{
wb->fsync_error();
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); uw_it++)
{
if (uw_it->second.state == CACHE_FLUSHING)
{
uw_it->second.state = CACHE_DIRTY;
}
}
if (op->retval == -EPIPE || op->retval == -EIO || op->retval == -ENOSPC)
{
// Retry later
@@ -1083,7 +1096,16 @@ resume_1:
}
else
{
wb->fsync_ok();
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); )
{
if (uw_it->second.state == CACHE_FLUSHING)
{
free(uw_it->second.buf);
dirty_buffers.erase(uw_it++);
}
else
uw_it++;
}
}
erase_op(op);
return 1;

View File

@@ -8,9 +8,6 @@
#define DEFAULT_CLIENT_MAX_DIRTY_BYTES 32*1024*1024
#define DEFAULT_CLIENT_MAX_DIRTY_OPS 1024
#define DEFAULT_CLIENT_MAX_BUFFERED_BYTES 32*1024*1024
#define DEFAULT_CLIENT_MAX_BUFFERED_OPS 1024
#define DEFAULT_CLIENT_MAX_WRITEBACK_IODEPTH 256
#define INODE_LIST_DONE 1
#define INODE_LIST_HAS_UNSTABLE 2
#define OSD_OP_READ_BITMAP OSD_OP_SEC_READ_BMP
@@ -67,12 +64,17 @@ protected:
cluster_op_t *prev = NULL, *next = NULL;
int prev_wait = 0;
friend class cluster_client_t;
friend class writeback_cache_t;
};
struct cluster_buffer_t
{
void *buf;
uint64_t len;
int state;
};
struct inode_list_t;
struct inode_list_osd_t;
class writeback_cache_t;
// FIXME: Split into public and private interfaces
class cluster_client_t
@@ -81,23 +83,16 @@ class cluster_client_t
ring_loop_t *ringloop;
std::map<pool_id_t, uint64_t> pg_counts;
// client_max_dirty_* is actually "max unsynced", for the case when immediate_commit is off
// FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
uint64_t client_max_dirty_bytes = 0;
uint64_t client_max_dirty_ops = 0;
// writeback improves (1) small consecutive writes and (2) Q1 writes without fsync
bool enable_writeback = false;
// client_max_buffered_* is the real "dirty limit" - maximum amount of writes buffered in memory
uint64_t client_max_buffered_bytes = 0;
uint64_t client_max_buffered_ops = 0;
uint64_t client_max_writeback_iodepth = 0;
int log_level;
int up_wait_retry_interval = 500; // ms
int retry_timeout_id = 0;
std::vector<cluster_op_t*> offline_ops;
cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL;
writeback_cache_t *wb = NULL;
std::map<object_id, cluster_buffer_t> dirty_buffers;
std::set<osd_num_t> dirty_osds;
uint64_t dirty_bytes = 0, dirty_ops = 0;
@@ -127,10 +122,10 @@ public:
void execute_raw(osd_num_t osd_num, osd_op_t *op);
bool is_ready();
void on_ready(std::function<void(void)> fn);
bool flush();
bool get_immediate_commit(uint64_t inode);
static void copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers);
void continue_ops(bool up_retry = false);
inode_list_t *list_inode_start(inode_t inode,
std::function<void(inode_list_t* lst, std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback);
@@ -143,14 +138,12 @@ public:
protected:
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
void flush_buffer(const object_id & oid, cluster_buffer_t *wr);
void on_load_config_hook(json11::Json::object & config);
void on_load_pgs_hook(bool success);
void on_change_hook(std::map<std::string, etcd_kv_t> & changes);
void on_change_osd_state_hook(uint64_t peer_osd);
void execute_internal(cluster_op_t *op);
void unshift_op(cluster_op_t *op);
int continue_rw(cluster_op_t *op);
bool check_rw(cluster_op_t *op);
void slice_rw(cluster_op_t *op);
bool try_send(cluster_op_t *op, int i);
int continue_sync(cluster_op_t *op);
@@ -164,6 +157,4 @@ protected:
void continue_listing(inode_list_t *lst);
void send_list(inode_list_osd_t *cur_list);
void continue_raw_ops(osd_num_t peer_osd);
friend class writeback_cache_t;
};

View File

@@ -1,57 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#include "cluster_client.h"
#define SCRAP_BUFFER_SIZE 4*1024*1024
#define PART_SENT 1
#define PART_DONE 2
#define PART_ERROR 4
#define PART_RETRY 8
#define CACHE_DIRTY 1
#define CACHE_WRITTEN 2
#define CACHE_FLUSHING 3
#define CACHE_REPEATING 4
#define OP_FLUSH_BUFFER 0x02
#define OP_IMMEDIATE_COMMIT 0x04
struct cluster_buffer_t
{
uint8_t *buf;
uint64_t len;
int state;
uint64_t flush_id;
uint64_t *refcnt;
};
typedef std::map<object_id, cluster_buffer_t>::iterator dirty_buf_it_t;
class writeback_cache_t
{
public:
uint64_t writeback_bytes = 0;
int writeback_queue_size = 0;
int writebacks_active = 0;
uint64_t last_flush_id = 0;
std::map<object_id, cluster_buffer_t> dirty_buffers;
std::vector<cluster_op_t*> writeback_overflow;
std::vector<object_id> writeback_queue;
std::multimap<uint64_t, uint64_t*> flushed_buffers; // flush_id => refcnt
~writeback_cache_t();
dirty_buf_it_t find_dirty(uint64_t inode, uint64_t offset);
bool is_left_merged(dirty_buf_it_t dirty_it);
bool is_right_merged(dirty_buf_it_t dirty_it);
bool is_merged(const dirty_buf_it_t & dirty_it);
void copy_write(cluster_op_t *op, int state);
int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd);
void start_writebacks(cluster_client_t *cli, int count);
bool read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity);
void flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it);
void fsync_start();
void fsync_error();
void fsync_ok();
};

View File

@@ -1,498 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <cassert>
#include "cluster_client_impl.h"
writeback_cache_t::~writeback_cache_t()
{
for (auto & bp: dirty_buffers)
{
if (!--(*bp.second.refcnt))
{
free(bp.second.refcnt); // refcnt is allocated with the buffer
}
}
dirty_buffers.clear();
}
dirty_buf_it_t writeback_cache_t::find_dirty(uint64_t inode, uint64_t offset)
{
auto dirty_it = dirty_buffers.lower_bound((object_id){
.inode = inode,
.stripe = offset,
});
while (dirty_it != dirty_buffers.begin())
{
dirty_it--;
if (dirty_it->first.inode != inode ||
(dirty_it->first.stripe + dirty_it->second.len) <= offset)
{
dirty_it++;
break;
}
}
return dirty_it;
}
bool writeback_cache_t::is_left_merged(dirty_buf_it_t dirty_it)
{
if (dirty_it != dirty_buffers.begin())
{
auto prev_it = dirty_it;
prev_it--;
if (prev_it->first.inode == dirty_it->first.inode &&
prev_it->first.stripe+prev_it->second.len == dirty_it->first.stripe &&
prev_it->second.state == CACHE_DIRTY)
{
return true;
}
}
return false;
}
bool writeback_cache_t::is_right_merged(dirty_buf_it_t dirty_it)
{
auto next_it = dirty_it;
next_it++;
if (next_it != dirty_buffers.end() &&
next_it->first.inode == dirty_it->first.inode &&
next_it->first.stripe == dirty_it->first.stripe+dirty_it->second.len &&
next_it->second.state == CACHE_DIRTY)
{
return true;
}
return false;
}
bool writeback_cache_t::is_merged(const dirty_buf_it_t & dirty_it)
{
return is_left_merged(dirty_it) || is_right_merged(dirty_it);
}
void writeback_cache_t::copy_write(cluster_op_t *op, int state)
{
// Save operation for replay when one of PGs goes out of sync
// (primary OSD drops our connection in this case)
// ...or just save it for writeback if write buffering is enabled
if (op->len == 0)
{
return;
}
auto dirty_it = find_dirty(op->inode, op->offset);
auto new_end = op->offset + op->len;
while (dirty_it != dirty_buffers.end() &&
dirty_it->first.inode == op->inode &&
dirty_it->first.stripe < op->offset+op->len)
{
assert(dirty_it->first.stripe + dirty_it->second.len > op->offset);
// Remove overlapping part(s) of buffers
auto old_end = dirty_it->first.stripe + dirty_it->second.len;
if (dirty_it->first.stripe < op->offset)
{
if (old_end > new_end)
{
// Split into end and start
dirty_it->second.len = op->offset - dirty_it->first.stripe;
dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
.inode = op->inode,
.stripe = new_end,
}, (cluster_buffer_t){
.buf = dirty_it->second.buf + new_end - dirty_it->first.stripe,
.len = old_end - new_end,
.state = dirty_it->second.state,
.flush_id = dirty_it->second.flush_id,
.refcnt = dirty_it->second.refcnt,
});
(*dirty_it->second.refcnt)++;
if (dirty_it->second.state == CACHE_DIRTY)
{
writeback_bytes -= op->len;
writeback_queue_size++;
}
break;
}
else
{
// Only leave the beginning
if (dirty_it->second.state == CACHE_DIRTY)
{
writeback_bytes -= old_end - op->offset;
if (is_left_merged(dirty_it) && !is_right_merged(dirty_it))
{
writeback_queue_size++;
}
}
dirty_it->second.len = op->offset - dirty_it->first.stripe;
dirty_it++;
}
}
else if (old_end > new_end)
{
// Only leave the end
if (dirty_it->second.state == CACHE_DIRTY)
{
writeback_bytes -= new_end - dirty_it->first.stripe;
if (!is_left_merged(dirty_it) && is_right_merged(dirty_it))
{
writeback_queue_size++;
}
}
auto new_dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
.inode = op->inode,
.stripe = new_end,
}, (cluster_buffer_t){
.buf = dirty_it->second.buf + new_end - dirty_it->first.stripe,
.len = old_end - new_end,
.state = dirty_it->second.state,
.flush_id = dirty_it->second.flush_id,
.refcnt = dirty_it->second.refcnt,
});
dirty_buffers.erase(dirty_it);
dirty_it = new_dirty_it;
break;
}
else
{
// Remove the whole buffer
if (dirty_it->second.state == CACHE_DIRTY && !is_merged(dirty_it))
{
writeback_bytes -= dirty_it->second.len;
assert(writeback_queue_size > 0);
writeback_queue_size--;
}
if (!--(*dirty_it->second.refcnt))
{
free(dirty_it->second.refcnt);
}
dirty_buffers.erase(dirty_it++);
}
}
// Overlapping buffers are removed, just insert the new one
uint64_t *refcnt = (uint64_t*)malloc_or_die(sizeof(uint64_t) + op->len);
uint8_t *buf = (uint8_t*)refcnt + sizeof(uint64_t);
*refcnt = 1;
dirty_it = dirty_buffers.emplace_hint(dirty_it, (object_id){
.inode = op->inode,
.stripe = op->offset,
}, (cluster_buffer_t){
.buf = buf,
.len = op->len,
.state = state,
.refcnt = refcnt,
});
if (state == CACHE_DIRTY)
{
writeback_bytes += op->len;
// Track consecutive write-back operations
if (!is_merged(dirty_it))
{
// <writeback_queue> is OK to contain more than actual number of consecutive
// requests as long as it doesn't miss anything. But <writeback_queue_size>
// is always calculated correctly.
writeback_queue_size++;
writeback_queue.push_back((object_id){
.inode = op->inode,
.stripe = op->offset,
});
}
}
uint64_t pos = 0, len = op->len, iov_idx = 0;
while (len > 0 && iov_idx < op->iov.count)
{
auto & iov = op->iov.buf[iov_idx];
memcpy(buf + pos, iov.iov_base, iov.iov_len);
pos += iov.iov_len;
iov_idx++;
}
}
int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd)
{
int repeated = 0;
if (dirty_buffers.size())
{
// peer_osd just dropped connection
// determine WHICH dirty_buffers are now obsolete and repeat them
for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; )
{
bool end = wr_it == dirty_buffers.end();
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING &&
cli->affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
if (flush_it != wr_it && (end || !flush_this ||
wr_it->first.inode != flush_it->first.inode ||
wr_it->first.stripe != last_it->first.stripe+last_it->second.len))
{
repeated++;
flush_buffers(cli, flush_it, wr_it);
flush_it = wr_it;
}
if (end)
break;
last_it = wr_it;
wr_it++;
if (!flush_this)
flush_it = wr_it;
}
}
return repeated;
}
void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it)
{
auto prev_it = to_it;
prev_it--;
bool is_writeback = from_it->second.state == CACHE_DIRTY;
cluster_op_t *op = new cluster_op_t;
op->flags = OSD_OP_IGNORE_READONLY|OP_FLUSH_BUFFER;
op->opcode = OSD_OP_WRITE;
op->cur_inode = op->inode = from_it->first.inode;
op->offset = from_it->first.stripe;
op->len = prev_it->first.stripe + prev_it->second.len - from_it->first.stripe;
uint32_t calc_len = 0;
uint64_t flush_id = ++last_flush_id;
for (auto it = from_it; it != to_it; it++)
{
it->second.state = CACHE_REPEATING;
it->second.flush_id = flush_id;
(*it->second.refcnt)++;
flushed_buffers.emplace(flush_id, it->second.refcnt);
op->iov.push_back(it->second.buf, it->second.len);
calc_len += it->second.len;
}
assert(calc_len == op->len);
writebacks_active++;
op->callback = [this, cli, flush_id](cluster_op_t* op)
{
// Buffer flushes should be always retried, regardless of the error,
// so they should never result in an error here
assert(op->retval == op->len);
for (auto fl_it = flushed_buffers.find(flush_id);
fl_it != flushed_buffers.end() && fl_it->first == flush_id; )
{
if (!--(*fl_it->second)) // refcnt
{
free(fl_it->second);
}
flushed_buffers.erase(fl_it++);
}
for (auto dirty_it = find_dirty(op->inode, op->offset);
dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode &&
dirty_it->first.stripe < op->offset+op->len; dirty_it++)
{
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
{
dirty_it->second.flush_id = 0;
dirty_it->second.state = CACHE_WRITTEN;
}
}
delete op;
writebacks_active--;
// We can't call execute_internal because it affects an invalid copy of the list here
// (erase_op remembers `next` after writeback callback)
};
if (is_writeback)
{
cli->execute_internal(op);
}
else
{
// Insert repeated flushes into the beginning
cli->unshift_op(op);
cli->continue_rw(op);
}
}
void writeback_cache_t::start_writebacks(cluster_client_t *cli, int count)
{
if (!writeback_queue.size())
{
return;
}
std::vector<object_id> queue_copy;
queue_copy.swap(writeback_queue);
int started = 0, i = 0;
for (i = 0; i < queue_copy.size() && (!count || started < count); i++)
{
object_id & req = queue_copy[i];
auto dirty_it = find_dirty(req.inode, req.stripe);
if (dirty_it == dirty_buffers.end() ||
dirty_it->first.inode != req.inode ||
dirty_it->second.state != CACHE_DIRTY)
{
continue;
}
auto from_it = dirty_it;
uint64_t off = dirty_it->first.stripe;
while (from_it != dirty_buffers.begin())
{
from_it--;
if (from_it->second.state != CACHE_DIRTY ||
from_it->first.inode != req.inode ||
from_it->first.stripe+from_it->second.len != off)
{
from_it++;
break;
}
off = from_it->first.stripe;
}
off = dirty_it->first.stripe + dirty_it->second.len;
auto to_it = dirty_it;
to_it++;
while (to_it != dirty_buffers.end())
{
if (to_it->second.state != CACHE_DIRTY ||
to_it->first.inode != req.inode ||
to_it->first.stripe != off)
{
break;
}
off = to_it->first.stripe + to_it->second.len;
to_it++;
}
started++;
assert(writeback_queue_size > 0);
writeback_queue_size--;
writeback_bytes -= off - from_it->first.stripe;
flush_buffers(cli, from_it, to_it);
}
queue_copy.erase(queue_copy.begin(), queue_copy.begin()+i);
if (writeback_queue.size())
{
queue_copy.insert(queue_copy.end(), writeback_queue.begin(), writeback_queue.end());
}
queue_copy.swap(writeback_queue);
}
static void copy_to_op(cluster_op_t *op, uint64_t offset, uint8_t *buf, uint64_t len, uint32_t bitmap_granularity)
{
if (op->opcode == OSD_OP_READ)
{
// Not OSD_OP_READ_BITMAP or OSD_OP_READ_CHAIN_BITMAP
int iov_idx = 0;
uint64_t cur_offset = op->offset;
while (iov_idx < op->iov.count && cur_offset+op->iov.buf[iov_idx].iov_len <= offset)
{
cur_offset += op->iov.buf[iov_idx].iov_len;
iov_idx++;
}
while (iov_idx < op->iov.count && cur_offset < offset+len)
{
auto & v = op->iov.buf[iov_idx];
auto begin = (cur_offset < offset ? offset : cur_offset);
auto end = (cur_offset+v.iov_len > offset+len ? offset+len : cur_offset+v.iov_len);
memcpy(
v.iov_base + begin - cur_offset,
buf + (cur_offset <= offset ? 0 : cur_offset-offset),
end - begin
);
cur_offset += v.iov_len;
iov_idx++;
}
}
// Set bitmap bits
int start_bit = (offset-op->offset)/bitmap_granularity;
int end_bit = (offset-op->offset+len)/bitmap_granularity;
for (int bit = start_bit; bit < end_bit;)
{
if (!(bit%8) && bit <= end_bit-8)
{
((uint8_t*)op->bitmap_buf)[bit/8] = 0xFF;
bit += 8;
}
else
{
((uint8_t*)op->bitmap_buf)[bit/8] |= (1 << (bit%8));
bit++;
}
}
}
bool writeback_cache_t::read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity)
{
bool dirty_copied = false;
if (dirty_buffers.size() && (op->opcode == OSD_OP_READ ||
op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_READ_CHAIN_BITMAP))
{
// We also have to return reads from CACHE_REPEATING buffers - they are not
// guaranteed to be present on target OSDs at the moment of repeating
// And we're also free to return data from other cached buffers just
// because it's faster
auto dirty_it = find_dirty(op->cur_inode, op->offset);
while (dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->cur_inode &&
dirty_it->first.stripe < op->offset+op->len)
{
uint64_t begin = dirty_it->first.stripe, end = dirty_it->first.stripe + dirty_it->second.len;
if (begin < op->offset)
begin = op->offset;
if (end > op->offset+op->len)
end = op->offset+op->len;
bool skip_prev = true;
uint64_t cur = begin, prev = begin;
while (cur < end)
{
unsigned bmp_loc = (cur - op->offset)/bitmap_granularity;
bool skip = (((*((uint8_t*)op->bitmap_buf + bmp_loc/8)) >> (bmp_loc%8)) & 0x1);
if (skip_prev != skip)
{
if (cur > prev && !skip)
{
// Copy data
dirty_copied = true;
copy_to_op(op, prev, dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, bitmap_granularity);
}
skip_prev = skip;
prev = cur;
}
cur += bitmap_granularity;
}
assert(cur > prev);
if (!skip_prev)
{
// Copy data
dirty_copied = true;
copy_to_op(op, prev, dirty_it->second.buf + prev - dirty_it->first.stripe, cur-prev, bitmap_granularity);
}
dirty_it++;
}
}
return dirty_copied;
}
void writeback_cache_t::fsync_start()
{
for (auto & prev_op: dirty_buffers)
{
if (prev_op.second.state == CACHE_WRITTEN)
{
prev_op.second.state = CACHE_FLUSHING;
}
}
}
void writeback_cache_t::fsync_error()
{
for (auto & prev_op: dirty_buffers)
{
if (prev_op.second.state == CACHE_FLUSHING)
{
prev_op.second.state = CACHE_WRITTEN;
}
}
}
void writeback_cache_t::fsync_ok()
{
for (auto uw_it = dirty_buffers.begin(); uw_it != dirty_buffers.end(); )
{
if (uw_it->second.state == CACHE_FLUSHING)
{
if (!--(*uw_it->second.refcnt))
free(uw_it->second.refcnt);
dirty_buffers.erase(uw_it++);
}
else
uw_it++;
}
}

View File

@@ -74,7 +74,7 @@ static const char *help_text =
" If it doesn't succeed it issues a warning in the system log.\n"
" \n"
" You can also pass other OSD options here as arguments and they'll be persisted\n"
" in the superblock: data_io, meta_io, journal_io,\n"
" in the superblock: cached_io_data, cached_io_meta, cached_io_journal,\n"
" inmemory_metadata, inmemory_journal, max_write_iodepth,\n"
" min_flusher_count, max_flusher_count, journal_sector_buffer_count,\n"
" journal_no_same_sector_overwrites, throttle_small_writes, throttle_target_iops,\n"

View File

@@ -8,9 +8,9 @@
int disk_tool_t::prepare_one(std::map<std::string, std::string> options, int is_hdd)
{
static const char *allow_additional_params[] = {
"data_io",
"meta_io",
"journal_io",
"cached_io_data",
"cached_io_meta",
"cached_io_journal",
"max_write_iodepth",
"max_write_iodepth",
"min_flusher_count",
@@ -119,7 +119,7 @@ int disk_tool_t::prepare_one(std::map<std::string, std::string> options, int is_
try
{
dsk.parse_config(options);
dsk.data_io = dsk.meta_io = dsk.journal_io = "direct";
dsk.cached_io_data = dsk.cached_io_meta = dsk.cached_io_journal = false;
dsk.open_data();
dsk.open_meta();
dsk.open_journal();
@@ -483,7 +483,7 @@ int disk_tool_t::get_meta_partition(std::vector<vitastor_dev_info_t> & ssds, std
{
blockstore_disk_t dsk;
dsk.parse_config(options);
dsk.data_io = dsk.meta_io = dsk.journal_io = "direct";
dsk.cached_io_data = dsk.cached_io_meta = dsk.cached_io_journal = false;
dsk.open_data();
dsk.open_meta();
dsk.open_journal();

View File

@@ -91,7 +91,7 @@ int disk_tool_t::resize_parse_params()
try
{
dsk.parse_config(options);
dsk.data_io = dsk.meta_io = dsk.journal_io = "direct";
dsk.cached_io_data = dsk.cached_io_meta = dsk.cached_io_journal = false;
dsk.open_data();
dsk.open_meta();
dsk.open_journal();

View File

@@ -24,7 +24,6 @@
#include <netinet/tcp.h>
#include <vector>
#include <string>
#include "vitastor_c.h"
#include "fio_headers.h"
@@ -204,15 +203,6 @@ static void watch_callback(void *opaque, long watch)
bsd->watch = (void*)watch;
}
static void opt_push(std::vector<char *> & options, const char *opt, const char *value)
{
if (value)
{
options.push_back(strdup(opt));
options.push_back(strdup(value));
}
}
static int sec_setup(struct thread_data *td)
{
sec_options *o = (sec_options*)td->eo;
@@ -264,27 +254,8 @@ static int sec_setup(struct thread_data *td)
{
o->inode = 0;
}
std::vector<char *> options;
opt_push(options, "config_path", o->config_path);
opt_push(options, "etcd_address", o->etcd_host);
opt_push(options, "etcd_prefix", o->etcd_prefix);
if (o->use_rdma != -1)
opt_push(options, "use_rdma", std::to_string(o->use_rdma).c_str());
opt_push(options, "rdma_device", o->rdma_device);
if (o->rdma_port_num)
opt_push(options, "rdma_port_num", std::to_string(o->rdma_port_num).c_str());
if (o->rdma_gid_index)
opt_push(options, "rdma_gid_index", std::to_string(o->rdma_gid_index).c_str());
if (o->rdma_mtu)
opt_push(options, "rdma_mtu", std::to_string(o->rdma_mtu).c_str());
if (o->cluster_log)
opt_push(options, "log_level", std::to_string(o->cluster_log).c_str());
// allow writeback caching if -direct is not set
opt_push(options, "client_writeback_allowed", td->o.odirect ? "0" : "1");
bsd->cli = vitastor_c_create_uring_json((const char**)options.data(), options.size());
for (auto opt: options)
free(opt);
options.clear();
bsd->cli = vitastor_c_create_uring(o->config_path, o->etcd_host, o->etcd_prefix,
o->use_rdma, o->rdma_device, o->rdma_port_num, o->rdma_gid_index, o->rdma_mtu, o->cluster_log);
if (o->image)
{
bsd->watch = NULL;

View File

@@ -11,9 +11,6 @@
#include "addr_util.h"
#include "messenger.h"
#ifdef WITH_RDMA
#include "msgr_rdma.h"
#endif
void osd_messenger_t::init()
{
@@ -395,27 +392,24 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
},
},
};
json11::Json::object payload;
if (this->osd_num)
{
payload["osd_num"] = this->osd_num;
}
#ifdef WITH_RDMA
if (rdma_context)
{
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
if (cl->rdma_conn)
{
payload["connect_rdma"] = cl->rdma_conn->addr.to_string();
payload["rdma_max_msg"] = cl->rdma_conn->max_msg;
json11::Json payload = json11::Json::object {
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
{ "rdma_max_msg", cl->rdma_conn->max_msg },
};
std::string payload_str = payload.dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
}
}
#endif
std::string payload_str = json11::Json(payload).dump();
op->req.show_conf.json_len = payload_str.size();
op->buf = malloc_or_die(payload_str.size());
op->iov.push_back(op->buf, payload_str.size());
memcpy(op->buf, payload_str.c_str(), payload_str.size());
op->callback = [this, cl](osd_op_t *op)
{
std::string json_err;

View File

@@ -18,6 +18,10 @@
#include "timerfd_manager.h"
#include <ringloop.h>
#ifdef WITH_RDMA
#include "msgr_rdma.h"
#endif
#define CL_READ_HDR 1
#define CL_READ_DATA 2
#define CL_READ_REPLY_DATA 3
@@ -40,11 +44,6 @@ struct msgr_sendp_t
int flags;
};
#ifdef WITH_RDMA
struct msgr_rdma_connection_t;
struct msgr_rdma_context_t;
#endif
struct osd_client_t
{
int refs = 0;

View File

@@ -55,10 +55,3 @@ json11::Json::object osd_messenger_t::merge_configs(const json11::Json::object &
{
return cli_config;
}
bool json_is_true(const json11::Json & val)
{
if (val.is_string())
return val == "true" || val == "yes" || val == "1";
return val.bool_value();
}

View File

@@ -22,10 +22,4 @@ public:
void submit()
{
}
void wait()
{
}
void loop()
{
}
};

View File

@@ -5,9 +5,6 @@
#include <assert.h>
#include "messenger.h"
#ifdef WITH_RDMA
#include "msgr_rdma.h"
#endif
void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
{

View File

@@ -341,7 +341,6 @@ public:
ringloop->loop();
ringloop->wait();
}
cli->flush();
delete cli;
delete epmgr;
delete ringloop;

View File

@@ -34,10 +34,7 @@ nfs_proxy_t::~nfs_proxy_t()
if (cmd)
delete cmd;
if (cli)
{
cli->flush();
delete cli;
}
if (epmgr)
delete epmgr;
if (ringloop)
@@ -264,8 +261,16 @@ void nfs_proxy_t::run(json11::Json cfg)
ringloop->loop();
ringloop->wait();
}
/*// Sync at the end
cluster_op_t *close_sync = new cluster_op_t;
close_sync->opcode = OSD_OP_SYNC;
close_sync->callback = [&stop](cluster_op_t *op)
{
stop = true;
delete op;
};
cli->execute(close_sync);*/
// Destroy the client
cli->flush();
delete cli;
delete epmgr;
delete ringloop;

View File

@@ -184,14 +184,6 @@ void osd_t::parse_config(bool init)
// Allow to set it to 0
autosync_writes = config["autosync_writes"].uint64_value();
}
if (!config["fsync_feedback_repeat_interval"].is_null())
{
fsync_feedback_repeat_interval = config["fsync_feedback_repeat_interval"].uint64_value();
}
if (!fsync_feedback_repeat_interval)
{
fsync_feedback_repeat_interval = 500; // ms
}
if (!config["client_queue_depth"].is_null())
{
client_queue_depth = config["client_queue_depth"].uint64_value();

View File

@@ -122,7 +122,6 @@ class osd_t
uint32_t scrub_list_limit = 1000;
bool scrub_find_best = true;
uint64_t scrub_ec_max_bruteforce = 100;
uint64_t fsync_feedback_repeat_interval = 500;
// cluster state
@@ -167,8 +166,6 @@ class osd_t
uint64_t unstable_write_count = 0;
std::map<osd_object_id_t, uint64_t> unstable_writes;
std::deque<osd_op_t*> syncs_in_progress;
std::map<int, timespec> unstable_write_osds;
int fsync_feedback_timer_id = -1;
// client & peer I/O
@@ -260,7 +257,6 @@ class osd_t
void exec_show_config(osd_op_t *cur_op);
void exec_secondary(osd_op_t *cur_op);
void secondary_op_callback(osd_op_t *cur_op);
void fsync_feedback();
// primary ops
void autosync();

View File

@@ -186,12 +186,10 @@ json11::Json osd_t::get_statistics()
if (bs)
{
st["blockstore_ready"] = bs->is_started();
st["data_block_size"] = (uint64_t)bs->get_block_size();
st["size"] = bs->get_block_count() * bs->get_block_size();
st["free"] = bs->get_free_block_count() * bs->get_block_size();
}
st["data_block_size"] = (uint64_t)bs_block_size;
st["bitmap_granularity"] = (uint64_t)bs_bitmap_granularity;
st["immediate_commit"] = immediate_commit == IMMEDIATE_ALL ? "all" : (immediate_commit == IMMEDIATE_SMALL ? "small" : "none");
st["host"] = self_state["host"];
json11::Json::object op_stats, subop_stats;
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)

View File

@@ -2,9 +2,6 @@
// License: VNPL-1.1 (see README.md for details)
#include "osd.h"
#ifdef WITH_RDMA
#include "msgr_rdma.h"
#endif
#include "json11/json11.hpp"
@@ -29,23 +26,6 @@ void osd_t::secondary_op_callback(osd_op_t *op)
if (op->bs_op->retval > 0)
op->iov.push_back(op->buf, op->bs_op->retval);
}
else if (op->req.hdr.opcode == OSD_OP_SEC_WRITE ||
op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE)
{
#ifndef OSD_STUB
fsync_feedback();
#endif
if (op->req.hdr.opcode == OSD_OP_SEC_WRITE)
{
auto & u = unstable_write_osds[op->peer_fd];
u = u;
}
}
else if (op->req.hdr.opcode == OSD_OP_SEC_SYNC)
{
// FIXME It would be more correct to track STABILIZE ops, not just reset on SYNC
unstable_write_osds.erase(op->peer_fd);
}
else if (op->req.hdr.opcode == OSD_OP_SEC_LIST)
{
// allocated by blockstore
@@ -62,71 +42,6 @@ void osd_t::secondary_op_callback(osd_op_t *op)
finish_op(op, retval);
}
void osd_t::fsync_feedback()
{
if (!unstable_write_osds.size() || !bs->wants_fsync())
{
return;
}
bool postpone = false;
// Broadcast fsync feedback
timespec now;
clock_gettime(CLOCK_REALTIME, &now);
for (auto up_it = unstable_write_osds.begin(); up_it != unstable_write_osds.end(); )
{
auto & peer_fd = up_it->first;
auto & last_feedback = up_it->second;
if (msgr.clients.find(peer_fd) == msgr.clients.end() ||
!msgr.clients.at(peer_fd)->osd_num)
{
unstable_write_osds.erase(up_it++);
continue;
}
auto diff = (now.tv_sec-last_feedback.tv_sec)*1000 + (now.tv_nsec-last_feedback.tv_nsec)/1000000;
if (diff > fsync_feedback_repeat_interval)
{
last_feedback = now;
// Request fsync from the primary OSD
// Note: primary OSD should NOT divide syncs by clients or this logic will break
osd_op_t *fb_op = new osd_op_t();
fb_op->op_type = OSD_OP_OUT;
fb_op->req = (osd_any_op_t){
.sync = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SYNC,
},
},
};
fb_op->callback = [this](osd_op_t *op)
{
delete op;
};
fb_op->peer_fd = peer_fd;
msgr.outbox_push(fb_op);
}
else
{
postpone = true;
}
up_it++;
}
if (fsync_feedback_timer_id >= 0)
{
tfd->clear_timer(fsync_feedback_timer_id);
fsync_feedback_timer_id = -1;
}
if (postpone)
{
fsync_feedback_timer_id = tfd->set_timer(fsync_feedback_repeat_interval, false, [this](int timer_id)
{
fsync_feedback_timer_id = -1;
fsync_feedback();
});
}
}
void osd_t::exec_secondary(osd_op_t *cur_op)
{
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
@@ -240,7 +155,6 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
json11::Json req_json = cur_op->req.show_conf.json_len > 0
? json11::Json::parse(std::string((char *)cur_op->buf), json_err)
: json11::Json();
msgr.clients.at(cur_op->peer_fd)->osd_num = req_json["osd_num"].uint64_value();
// Expose sensitive configuration values so peers can check them
json11::Json::object wire_config = json11::Json::object {
{ "osd_num", osd_num },

View File

@@ -388,43 +388,6 @@ static void vitastor_aio_set_fd_handler(void *vcli, int fd, int unused1, IOHandl
);
}
typedef struct str_array
{
const char **items;
int len, alloc;
} str_array;
static void strarray_push(str_array *a, const char *str)
{
if (a->len >= a->alloc)
{
a->alloc = !a->alloc ? 4 : 2*a->alloc;
a->items = (const char**)realloc(a->items, a->alloc*sizeof(char*));
if (!a->items)
{
fprintf(stderr, "bad alloc\n");
abort();
}
}
a->items[a->len++] = str;
}
static void strarray_push_kv(str_array *a, const char *key, const char *value)
{
if (key && value)
{
strarray_push(a, key);
strarray_push(a, value);
}
}
static void strarray_free(str_array *a)
{
free(a->items);
a->items = NULL;
a->len = a->alloc = 0;
}
static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, Error **errp)
{
VitastorRPC task;
@@ -443,19 +406,23 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
client->rdma_gid_index = qdict_get_try_int(options, "rdma-gid-index", 0);
client->rdma_mtu = qdict_get_try_int(options, "rdma-mtu", 0);
client->ctx = bdrv_get_aio_context(bs);
str_array opt = {};
strarray_push_kv(&opt, "config_path", qdict_get_try_str(options, "config-path"));
strarray_push_kv(&opt, "etcd_address", qdict_get_try_str(options, "etcd-host"));
strarray_push_kv(&opt, "etcd_prefix", qdict_get_try_str(options, "etcd-prefix"));
strarray_push_kv(&opt, "use_rdma", qdict_get_try_str(options, "use-rdma"));
strarray_push_kv(&opt, "rdma_device", qdict_get_try_str(options, "rdma-device"));
strarray_push_kv(&opt, "rdma_port_num", qdict_get_try_str(options, "rdma-port-num"));
strarray_push_kv(&opt, "rdma_gid_index", qdict_get_try_str(options, "rdma-gid-index"));
strarray_push_kv(&opt, "rdma_mtu", qdict_get_try_str(options, "rdma-mtu"));
strarray_push_kv(&opt, "client_writeback_allowed", (flags & BDRV_O_NOCACHE) ? "0" : "1");
client->proxy = vitastor_c_create_uring_json(opt.items, opt.len);
strarray_free(&opt);
if (client->proxy)
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
client->proxy = vitastor_c_create_qemu_uring(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
if (!client->proxy)
{
fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower\n", strerror(errno));
client->uring_eventfd = -1;
#endif
client->proxy = vitastor_c_create_qemu(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
}
else
{
client->uring_eventfd = vitastor_c_uring_register_eventfd(client->proxy);
if (client->uring_eventfd < 0)
@@ -467,45 +434,7 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
}
universal_aio_set_fd_handler(client->ctx, client->uring_eventfd, vitastor_uring_handler, NULL, client);
}
else
{
// Writeback cache is unusable without io_uring because the client can't correctly flush on exit
fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower%s\n",
strerror(errno), (flags & BDRV_O_NOCACHE ? "" : " and writeback cache will be disabled"));
client->uring_eventfd = -1;
#if defined VITASTOR_C_API_VERSION && VITASTOR_C_API_VERSION >= 2
client->proxy = vitastor_c_create_qemu_uring(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
if (!client->proxy)
{
fprintf(stderr, "vitastor: failed to create io_uring: %s - I/O will be slower\n", strerror(errno));
client->uring_eventfd = -1;
client->proxy = vitastor_c_create_qemu(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
}
else
{
client->uring_eventfd = vitastor_c_uring_register_eventfd(client->proxy);
if (client->uring_eventfd < 0)
{
fprintf(stderr, "vitastor: failed to create io_uring eventfd: %s\n", strerror(errno));
error_setg(errp, "failed to create io_uring eventfd");
vitastor_close(bs);
return -1;
}
universal_aio_set_fd_handler(client->ctx, client->uring_eventfd, vitastor_uring_handler, NULL, client);
}
#else
client->proxy = vitastor_c_create_qemu(
vitastor_aio_set_fd_handler, client, client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
#endif
}
image = client->image = g_strdup(qdict_get_try_str(options, "image"));
client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0;
// Get image metadata (size and readonly flag) or just wait until the client is ready

View File

@@ -4,7 +4,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include "cluster_client_impl.h"
#include "cluster_client.h"
void configure_single_pg_pool(cluster_client_t *cli)
{
@@ -47,11 +47,11 @@ void configure_single_pg_pool(cluster_client_t *cli)
cli->st_cli.on_change_hook(changes);
}
int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function<void()> cb = NULL, bool instant = false)
int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function<void()> cb = NULL)
{
printf("Post write %lx+%lx\n", offset, len);
int *r = new int;
*r = instant ? -2 : -1;
*r = -1;
cluster_op_t *op = new cluster_op_t();
op->opcode = OSD_OP_WRITE;
op->inode = 0x1000000000001;
@@ -72,13 +72,6 @@ int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c,
cb();
};
cli->execute(op);
if (instant)
{
long res = *r;
assert(*r >= 0);
delete r;
return (int*)res;
}
return r;
}
@@ -167,13 +160,6 @@ osd_op_t *find_op(cluster_client_t *cli, osd_num_t osd_num, uint64_t opcode, uin
}
op_it++;
}
op_it = cli->msgr.clients[peer_fd]->sent_ops.begin();
while (op_it != cli->msgr.clients[peer_fd]->sent_ops.end())
{
printf("Found opcode %lu offset %lx size %x\n", op_it->second->req.hdr.opcode, op_it->second->req.rw.offset, op_it->second->req.rw.len);
op_it++;
}
printf("Not found opcode %lu offset %lx size %lx\n", opcode, offset, len);
return NULL;
}
@@ -206,16 +192,11 @@ void test1()
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
check_completed(r1);
r1 = test_write(cli, 4096, 4096, 0x56);
can_complete(r1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 4096, 4096), 0);
check_completed(r1);
pretend_disconnected(cli, 1);
int *r2 = test_sync(cli);
pretend_connected(cli, 1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 8192), 0);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 4096), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
@@ -340,8 +321,9 @@ void test1()
check_disconnected(cli, 1);
pretend_connected(cli, 1);
cli->continue_ops(true);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x2000), 0);
check_op_count(cli, 1, 2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);
@@ -355,7 +337,7 @@ void test1()
void test2()
{
writeback_cache_t *wb = new writeback_cache_t();
std::map<object_id, cluster_buffer_t> unsynced_writes;
cluster_op_t *op = new cluster_op_t();
op->opcode = OSD_OP_WRITE;
op->inode = 1;
@@ -364,19 +346,19 @@ void test2()
op->iov.push_back(malloc_or_die(4096*1024), 4096);
// 0-4k = 0x55
memset(op->iov.buf[0].iov_base, 0x55, op->iov.buf[0].iov_len);
wb->copy_write(op, CACHE_WRITTEN);
cluster_client_t::copy_write(op, unsynced_writes);
// 8k-12k = 0x66
op->offset = 8192;
memset(op->iov.buf[0].iov_base, 0x66, op->iov.buf[0].iov_len);
wb->copy_write(op, CACHE_WRITTEN);
cluster_client_t::copy_write(op, unsynced_writes);
// 4k-1M+4k = 0x77
op->len = op->iov.buf[0].iov_len = 1048576;
op->offset = 4096;
memset(op->iov.buf[0].iov_base, 0x77, op->iov.buf[0].iov_len);
wb->copy_write(op, CACHE_WRITTEN);
cluster_client_t::copy_write(op, unsynced_writes);
// check it
assert(wb->dirty_buffers.size() == 2);
auto uit = wb->dirty_buffers.begin();
assert(unsynced_writes.size() == 4);
auto uit = unsynced_writes.begin();
int i;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 0);
@@ -386,106 +368,35 @@ void test2()
uit++;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 4096);
assert(uit->second.len == 1048576);
assert(uit->second.len == 4096);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {}
assert(i == uit->second.len);
uit++;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 8192);
assert(uit->second.len == 4096);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {}
assert(i == uit->second.len);
uit++;
assert(uit->first.inode == 1);
assert(uit->first.stripe == 12*1024);
assert(uit->second.len == 1016*1024);
for (i = 0; i < uit->second.len && ((uint8_t*)uit->second.buf)[i] == 0x77; i++) {}
assert(i == uit->second.len);
uit++;
// free memory
free(op->iov.buf[0].iov_base);
delete op;
delete wb;
for (auto p: unsynced_writes)
{
free(p.second.buf);
}
printf("[ok] copy_write test\n");
}
void test_writeback()
{
json11::Json config = json11::Json::object {
{ "client_enable_writeback", true },
{ "client_writeback_allowed", true },
{ "client_max_buffered_bytes", 1024*1024 },
{ "client_max_buffered_ops", 2 },
{ "client_max_writeback_iodepth", 2 },
{ "client_max_dirty_bytes", 1024*1024 },
{ "client_max_dirty_ops", 2 },
};
timerfd_manager_t *tfd = new timerfd_manager_t([](int fd, bool wr, std::function<void(int, int)> callback){});
cluster_client_t *cli = new cluster_client_t(NULL, tfd, config);
configure_single_pg_pool(cli);
pretend_connected(cli, 1);
// Check that 3 consecutive writes are merged by writeback
assert((long)test_write(cli, 0, 4096, 0x55, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 4096, 4096, 0x55, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 8192, 4096, 0x55, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 1024*1024, 4096, 0x66, NULL, true) == 1);
check_op_count(cli, 1, 0);
// 3rd and 4th writes should trigger 1 writeback each
assert((long)test_write(cli, 2*1024*1024, 4096, 0x66, NULL, true) == 1);
check_op_count(cli, 1, 1);
assert((long)test_write(cli, 3*1024*1024, 4096, 0x66, NULL, true) == 1);
check_op_count(cli, 1, 2);
// 5th write should be postponed until at least 1 writeback is completed
int *r1 = test_write(cli, 4*1024*1024, 4096, 0x67, NULL);
check_op_count(cli, 1, 2);
can_complete(r1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 3*4096), 0);
check_completed(r1);
// autosync because max_dirty_ops=2, flush waits for sync
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 1024*1024, 4096), 0);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 2*1024*1024, 4096), 0);
check_op_count(cli, 1, 0);
int *r2 = test_sync(cli);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 3*1024*1024, 4096), 0);
check_op_count(cli, 1, 1);
// autosync because max_dirty_ops=2, flush waits for sync
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 4*1024*1024, 4096), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_completed(r2);
// Check cutting of the beginning and end
assert((long)test_write(cli, 0, 32768, 0x55, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 32768, 32768, 0x56, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 16384, 32768, 0x57, NULL, true) == 1);
check_op_count(cli, 1, 0);
assert((long)test_write(cli, 16384+4096, 32768-4096, 0x58, NULL, true) == 1);
check_op_count(cli, 1, 0);
r2 = test_sync(cli);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 65536), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_SYNC, 0, 0), 0);
check_completed(r2);
// Free client
delete cli;
delete tfd;
printf("[ok] writeback test\n");
}
int main(int narg, char *args[])
{
test1();
test2();
test_writeback();
return 0;
}

View File

@@ -129,7 +129,6 @@ again:
if (exp.it_value.tv_sec < 0 || exp.it_value.tv_sec == 0 && exp.it_value.tv_nsec <= 0)
{
// It already happened
// FIXME: Postpone to setImmediate/BH to avoid reenterability problems
trigger_nearest();
goto again;
}

View File

@@ -27,7 +27,6 @@ ETCD_COUNT=${ETCD_COUNT:-1}
if [ "$KEEP_DATA" = "" ]; then
rm -rf ./testdata
rm -rf /run/user/$(id -u)/testdata_etcd*
mkdir -p ./testdata
fi
@@ -42,9 +41,7 @@ ETCDCTL="${ETCD}ctl --endpoints=$ETCD_URL --dial-timeout=5s --command-timeout=10
start_etcd()
{
local i=$1
local t=/run/user/$(id -u)
findmnt $t >/dev/null || (sudo mkdir -p $t && sudo mount -t tmpfs tmpfs $t)
ionice -c2 -n0 $ETCD -name etcd$i --data-dir /run/user/$(id -u)/testdata_etcd$i \
ionice -c2 -n0 $ETCD -name etcd$i --data-dir ./testdata/etcd$i \
--advertise-client-urls http://$ETCD_IP:$((ETCD_PORT+2*i-2)) --listen-client-urls http://$ETCD_IP:$((ETCD_PORT+2*i-2)) \
--initial-advertise-peer-urls http://$ETCD_IP:$((ETCD_PORT+2*i-1)) --listen-peer-urls http://$ETCD_IP:$((ETCD_PORT+2*i-1)) \
--initial-cluster-token vitastor-tests-etcd --initial-cluster-state new \

View File

@@ -19,10 +19,10 @@ fi
if [ "$IMMEDIATE_COMMIT" != "" ]; then
NO_SAME="--journal_no_same_sector_overwrites true --journal_sector_buffer_count 1024 --disable_data_fsync 1 --immediate_commit all --log_level 10"
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"osd_out_time":1,"immediate_commit":"all","client_enable_writeback":true}'
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"osd_out_time":1,"immediate_commit":"all"}'
else
NO_SAME="--journal_sector_buffer_count 1024 --log_level 10"
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"osd_out_time":1,"client_enable_writeback":true}'
$ETCDCTL put /vitastor/config/global '{"recovery_queue_depth":1,"osd_out_time":1}'
fi
start_osd_on()