Compare commits

..

7 Commits

57 changed files with 1323 additions and 416 deletions

View File

@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 2.8.12)
cmake_minimum_required(VERSION 2.8)
project(vitastor)
set(VERSION "0.8.6")
set(VERSION "0.8.5")
add_subdirectory(src)

View File

@@ -1,4 +1,4 @@
VERSION ?= v0.8.6
VERSION ?= v0.8.5
all: build push

View File

@@ -49,7 +49,7 @@ spec:
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: vitalif/vitastor-csi:v0.8.6
image: vitalif/vitastor-csi:v0.8.5
args:
- "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -116,7 +116,7 @@ spec:
privileged: true
capabilities:
add: ["SYS_ADMIN"]
image: vitalif/vitastor-csi:v0.8.6
image: vitalif/vitastor-csi:v0.8.5
args:
- "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -5,7 +5,7 @@ package vitastor
const (
vitastorCSIDriverName = "csi.vitastor.io"
vitastorCSIDriverVersion = "0.8.6"
vitastorCSIDriverVersion = "0.8.5"
)
// Config struct fills the parameters of request or user input

View File

@@ -6,7 +6,6 @@ package vitastor
import (
"context"
"encoding/json"
"fmt"
"strings"
"bytes"
"strconv"
@@ -179,7 +178,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
// Create image using vitastor-cli
_, err := invokeCLI(ctxVars, []string{ "create", volName, "-s", fmt.Sprintf("%v", volSize), "--pool", fmt.Sprintf("%v", poolId) })
_, err := invokeCLI(ctxVars, []string{ "create", volName, "-s", string(volSize), "--pool", string(poolId) })
if (err != nil)
{
if (strings.Index(err.Error(), "already exists") > 0)

4
debian/changelog vendored
View File

@@ -1,10 +1,10 @@
vitastor (0.8.6-1) unstable; urgency=medium
vitastor (0.8.5-1) unstable; urgency=medium
* Bugfixes
-- Vitaliy Filippov <vitalif@yourcmc.ru> Fri, 03 Jun 2022 02:09:44 +0300
vitastor (0.8.6-1) unstable; urgency=medium
vitastor (0.8.5-1) unstable; urgency=medium
* Implement NFS proxy
* Add documentation

View File

@@ -34,8 +34,8 @@ RUN set -e -x; \
mkdir -p /root/packages/vitastor-$REL; \
rm -rf /root/packages/vitastor-$REL/*; \
cd /root/packages/vitastor-$REL; \
cp -r /root/vitastor vitastor-0.8.6; \
cd vitastor-0.8.6; \
cp -r /root/vitastor vitastor-0.8.5; \
cd vitastor-0.8.5; \
ln -s /root/fio-build/fio-*/ ./fio; \
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
ls /usr/include/linux/raw.h || cp ./debian/raw.h /usr/include/linux/raw.h; \
@@ -48,8 +48,8 @@ RUN set -e -x; \
rm -rf a b; \
echo "dep:fio=$FIO" > debian/fio_version; \
cd /root/packages/vitastor-$REL; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.8.6.orig.tar.xz vitastor-0.8.6; \
cd vitastor-0.8.6; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.8.5.orig.tar.xz vitastor-0.8.5; \
cd vitastor-0.8.5; \
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \

View File

@@ -19,7 +19,6 @@ between clients, OSDs and etcd.
- [rdma_max_sge](#rdma_max_sge)
- [rdma_max_msg](#rdma_max_msg)
- [rdma_max_recv](#rdma_max_recv)
- [rdma_max_send](#rdma_max_send)
- [peer_connect_interval](#peer_connect_interval)
- [peer_connect_timeout](#peer_connect_timeout)
- [osd_idle_timeout](#osd_idle_timeout)
@@ -75,12 +74,6 @@ to work. For example, Mellanox ConnectX-3 and older adapters don't have
Implicit ODP, so they're unsupported by Vitastor. Run `ibv_devinfo -v` as
root to list available RDMA devices and their features.
Remember that you also have to configure your network switches if you use
RoCE/RoCEv2, otherwise you may experience unstable performance. Refer to
the manual of your network vendor for details about setting up the switch
for RoCEv2 correctly. Usually it means setting up Lossless Ethernet with
PFC (Priority Flow Control) and ECN (Explicit Congestion Notification).
## rdma_port_num
- Type: integer
@@ -123,30 +116,20 @@ required to change this parameter.
## rdma_max_msg
- Type: integer
- Default: 132096
- Default: 1048576
Maximum size of a single RDMA send or receive operation in bytes.
## rdma_max_recv
- Type: integer
- Default: 16
Maximum number of RDMA receive buffers per connection (RDMA requires
preallocated buffers to receive data). Each buffer is `rdma_max_msg` bytes
in size. So this setting directly affects memory usage: a single Vitastor
RDMA client uses `rdma_max_recv * rdma_max_msg * OSD_COUNT` bytes of memory.
Default is roughly 2 MB * number of OSDs.
## rdma_max_send
- Type: integer
- Default: 8
Maximum number of outstanding RDMA send operations per connection. Should be
less than `rdma_max_recv` so the receiving side doesn't run out of buffers.
Doesn't affect memory usage - additional memory isn't allocated for send
operations.
Maximum number of parallel RDMA receive operations. Note that this number
of receive buffers `rdma_max_msg` in size are allocated for each client,
so this setting actually affects memory usage. This is because RDMA receive
operations are (sadly) still not zero-copy in Vitastor. It may be fixed in
later versions.
## peer_connect_interval

View File

@@ -19,7 +19,6 @@
- [rdma_max_sge](#rdma_max_sge)
- [rdma_max_msg](#rdma_max_msg)
- [rdma_max_recv](#rdma_max_recv)
- [rdma_max_send](#rdma_max_send)
- [peer_connect_interval](#peer_connect_interval)
- [peer_connect_timeout](#peer_connect_timeout)
- [osd_idle_timeout](#osd_idle_timeout)
@@ -79,13 +78,6 @@ Implicit On-Demand Paging (Implicit ODP) и Scatter/Gather (SG). Наприме
суперпользователя, чтобы посмотреть список доступных RDMA-устройств, их
параметры и возможности.
Обратите внимание, что если вы используете RoCE/RoCEv2, вам также необходимо
правильно настроить для него коммутаторы, иначе вы можете столкнуться с
нестабильной производительностью. Подробную информацию о настройке
коммутатора для RoCEv2 ищите в документации производителя. Обычно это
подразумевает настройку сети без потерь на основе PFC (Priority Flow
Control) и ECN (Explicit Congestion Notification).
## rdma_port_num
- Тип: целое число
@@ -129,32 +121,22 @@ OSD в любом случае согласовывают реальное зн
## rdma_max_msg
- Тип: целое число
- Значение по умолчанию: 132096
- Значение по умолчанию: 1048576
Максимальный размер одной RDMA-операции отправки или приёма.
## rdma_max_recv
- Тип: целое число
- Значение по умолчанию: 16
Максимальное число буферов для RDMA-приёма данных на одно соединение
(RDMA требует заранее выделенных буферов для приёма данных). Каждый буфер
имеет размер `rdma_max_msg` байт. Таким образом, настройка прямо влияет на
потребление памяти - один Vitastor-клиент с RDMA использует
`rdma_max_recv * rdma_max_msg * ЧИСЛО_OSD` байт памяти, по умолчанию -
примерно 2 МБ * число OSD.
## rdma_max_send
- Тип: целое число
- Значение по умолчанию: 8
Максимальное число RDMA-операций отправки, отправляемых в очередь одного
соединения. Желательно, чтобы оно было меньше `rdma_max_recv`, чтобы
у принимающей стороны в процессе работы не заканчивались буферы на приём.
Не влияет на потребление памяти - дополнительная память на операции отправки
не выделяется.
Максимальное число параллельных RDMA-операций получения данных. Следует
иметь в виду, что данное число буферов размером `rdma_max_msg` выделяется
для каждого подключённого клиентского соединения, так что данная настройка
влияет на потребление памяти. Это так потому, что RDMA-приём данных в
Vitastor, увы, всё равно не является zero-copy, т.е. всё равно 1 раз
копирует данные в памяти. Данная особенность, возможно, будет исправлена в
более новых версиях Vitastor.
## peer_connect_interval

View File

@@ -53,12 +53,6 @@
to work. For example, Mellanox ConnectX-3 and older adapters don't have
Implicit ODP, so they're unsupported by Vitastor. Run `ibv_devinfo -v` as
root to list available RDMA devices and their features.
Remember that you also have to configure your network switches if you use
RoCE/RoCEv2, otherwise you may experience unstable performance. Refer to
the manual of your network vendor for details about setting up the switch
for RoCEv2 correctly. Usually it means setting up Lossless Ethernet with
PFC (Priority Flow Control) and ECN (Explicit Congestion Notification).
info_ru: |
Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0").
Имейте в виду, что поддержка RDMA в Vitastor требует функций устройства
@@ -67,13 +61,6 @@
потому не поддерживаются в Vitastor. Запустите `ibv_devinfo -v` от имени
суперпользователя, чтобы посмотреть список доступных RDMA-устройств, их
параметры и возможности.
Обратите внимание, что если вы используете RoCE/RoCEv2, вам также необходимо
правильно настроить для него коммутаторы, иначе вы можете столкнуться с
нестабильной производительностью. Подробную информацию о настройке
коммутатора для RoCEv2 ищите в документации производителя. Обычно это
подразумевает настройку сети без потерь на основе PFC (Priority Flow
Control) и ECN (Explicit Congestion Notification).
- name: rdma_port_num
type: int
default: 1
@@ -127,39 +114,26 @@
так что менять этот параметр обычно не нужно.
- name: rdma_max_msg
type: int
default: 132096
default: 1048576
info: Maximum size of a single RDMA send or receive operation in bytes.
info_ru: Максимальный размер одной RDMA-операции отправки или приёма.
- name: rdma_max_recv
type: int
default: 16
info: |
Maximum number of RDMA receive buffers per connection (RDMA requires
preallocated buffers to receive data). Each buffer is `rdma_max_msg` bytes
in size. So this setting directly affects memory usage: a single Vitastor
RDMA client uses `rdma_max_recv * rdma_max_msg * OSD_COUNT` bytes of memory.
Default is roughly 2 MB * number of OSDs.
info_ru: |
Максимальное число буферов для RDMA-приёма данных на одно соединение
(RDMA требует заранее выделенных буферов для приёма данных). Каждый буфер
имеет размер `rdma_max_msg` байт. Таким образом, настройка прямо влияет на
потребление памяти - один Vitastor-клиент с RDMA использует
`rdma_max_recv * rdma_max_msg * ЧИСЛО_OSD` байт памяти, по умолчанию -
примерно 2 МБ * число OSD.
- name: rdma_max_send
type: int
default: 8
info: |
Maximum number of outstanding RDMA send operations per connection. Should be
less than `rdma_max_recv` so the receiving side doesn't run out of buffers.
Doesn't affect memory usage - additional memory isn't allocated for send
operations.
Maximum number of parallel RDMA receive operations. Note that this number
of receive buffers `rdma_max_msg` in size are allocated for each client,
so this setting actually affects memory usage. This is because RDMA receive
operations are (sadly) still not zero-copy in Vitastor. It may be fixed in
later versions.
info_ru: |
Максимальное число RDMA-операций отправки, отправляемых в очередь одного
соединения. Желательно, чтобы оно было меньше `rdma_max_recv`, чтобы
у принимающей стороны в процессе работы не заканчивались буферы на приём.
Не влияет на потребление памяти - дополнительная память на операции отправки
не выделяется.
Максимальное число параллельных RDMA-операций получения данных. Следует
иметь в виду, что данное число буферов размером `rdma_max_msg` выделяется
для каждого подключённого клиентского соединения, так что данная настройка
влияет на потребление памяти. Это так потому, что RDMA-приём данных в
Vitastor, увы, всё равно не является zero-copy, т.е. всё равно 1 раз
копирует данные в памяти. Данная особенность, возможно, будет исправлена в
более новых версиях Vitastor.
- name: peer_connect_interval
type: sec
min: 1

View File

@@ -35,24 +35,15 @@ Write amplification for 4 KB blocks is usually 3-5 in Vitastor:
If you manage to get an SSD which handles 512 byte blocks well (Optane?) you may
lower 1, 3 and 4 to 512 bytes (1/8 of data size) and get WA as low as 2.375.
Implemented NVDIMM support can basically eliminate WA at all - all extra writes will
go to DRAM memory. But this requires a test cluster with NVDIMM - please contact me
if you want to provide me with such cluster for tests.
Lazy fsync also reduces WA for parallel workloads because journal blocks are only
written when they fill up or fsync is requested.
## In Practice
In practice, using tests from [Understanding Performance](understanding.en.md), decent TCP network,
good server-grade SSD/NVMe drives and disabled CPU power saving, you should head for:
In practice, using tests from [Understanding Performance](understanding.en.md)
and good server-grade SSD/NVMe drives, you should head for:
- At least 5000 T1Q1 replicated read and write iops (maximum 0.2ms latency)
- At least 5000 T1Q1 EC read IOPS and at least 2200 EC write IOPS (maximum 0.45ms latency)
- At least ~80k parallel read iops or ~30k write iops per 1 core (1 OSD)
- Disk-speed or wire-speed linear reads and writes, whichever is the bottleneck in your case
Lower results may mean that you have bad drives, bad network or some kind of misconfiguration.
Current latency records:
- 9668 T1Q1 replicated write iops (0.103 ms latency) with TCP and NVMe
- 9143 T1Q1 replicated read iops (0.109 ms latency) with TCP and NVMe

View File

@@ -36,25 +36,6 @@ WA (мультипликатор записи) для 4 КБ блоков в Vit
Если вы найдёте SSD, хорошо работающий с 512-байтными блоками данных (Optane?),
то 1, 3 и 4 можно снизить до 512 байт (1/8 от размера данных) и получить WA всего 2.375.
Если реализовать поддержку NVDIMM, то WA можно, условно говоря, ликвидировать вообще - все
дополнительные операции записи смогут обслуживаться DRAM памятью. Но для этого необходим
тестовый кластер с NVDIMM - пишите, если готовы предоставить такой для тестов.
Кроме того, WA снижается при использовании отложенного/ленивого сброса при параллельной
нагрузке, т.к. блоки журнала записываются на диск только когда они заполняются или явным
образом запрашивается fsync.
## На практике
На практике, используя тесты fio со страницы [Понимание сути производительности систем хранения](understanding.ru.md),
нормальную TCP-сеть, хорошие серверные SSD/NVMe, при отключённом энергосбережении процессоров вы можете рассчитывать на:
- От 5000 IOPS в 1 поток (T1Q1) и на чтение, и на запись при использовании репликации (задержка до 0.2мс)
- От 5000 IOPS в 1 поток (T1Q1) на чтение и 2200 IOPS в 1 поток на запись при использовании EC (задержка до 0.45мс)
- От 80000 IOPS на чтение в параллельном режиме на 1 ядро, от 30000 IOPS на запись на 1 ядро (на 1 OSD)
- Скорость параллельного линейного чтения и записи, равная меньшему значению из скорости дисков или сети
Худшие результаты означают, что у вас либо медленные диски, либо медленная сеть, либо что-то неправильно настроено.
Зафиксированный на данный момент рекорд задержки:
- 9668 IOPS (0.103 мс задержка) в 1 поток (T1Q1) на запись с TCP и NVMe при использовании репликации
- 9143 IOPS (0.109 мс задержка) в 1 поток (T1Q1) на чтение с TCP и NVMe при использовании репликации

View File

@@ -1,4 +1,4 @@
[Documentation](../../README.md#documentation) → Usage → Disk management tool
[Documentation](../../README.md#documentation) → Usage → Disk Tool
-----

View File

@@ -1,4 +1,4 @@
[Документация](../../README-ru.md#документация) → Использование → Инструмент управления дисками
[Документация](../../README-ru.md#документация) → Использование → Управление дисками
-----

View File

@@ -70,8 +70,8 @@ const etcd_tree = {
rdma_gid_index: 0,
rdma_mtu: 4096,
rdma_max_sge: 128,
rdma_max_send: 8,
rdma_max_recv: 16,
rdma_max_send: 64,
rdma_max_recv: 128,
rdma_max_msg: 132096,
log_level: 0,
block_size: 131072,
@@ -107,6 +107,10 @@ const etcd_tree = {
slow_log_interval: 10,
inode_vanish_time: 60,
osd_memlock: false,
scrub_interval: '30d', // 1s/1m/1h/1d
scrub_queue_depth: 1,
scrub_sleep: 0, // milliseconds
scrub_list_limit: 1000, // objects to list on one scrub iteration
// blockstore - fixed in superblock
block_size,
disk_alignment,
@@ -168,6 +172,8 @@ const etcd_tree = {
osd_tags?: 'nvme' | [ 'nvme', ... ],
// prefer to put primary on OSD with these tags
primary_affinity_tags?: 'nvme' | [ 'nvme', ... ],
// scrub interval
scrub_interval?: '30d',
},
...
}, */
@@ -262,8 +268,8 @@ const etcd_tree = {
<pg_id>: {
primary: osd_num_t,
state: ("starting"|"peering"|"incomplete"|"active"|"repeering"|"stopping"|"offline"|
"degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|
"has_invalid"|"left_on_dead")[],
"degraded"|"has_corrupted"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|
"has_invalid"|"left_on_dead"|"scrubbing")[],
}
}, */
},
@@ -285,6 +291,7 @@ const etcd_tree = {
osd_sets: osd_num_t[][],
all_peers: osd_num_t[],
epoch: uint64_t,
scrub_ts: uint64_t,
},
}, */
},

View File

@@ -50,7 +50,7 @@ from cinder.volume import configuration
from cinder.volume import driver
from cinder.volume import volume_utils
VERSION = '0.8.6'
VERSION = '0.8.5'
LOG = logging.getLogger(__name__)

View File

@@ -25,4 +25,4 @@ rm fio
mv fio-copy fio
FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
tar --transform 's#^#vitastor-0.8.6/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.8.6$(rpm --eval '%dist').tar.gz *
tar --transform 's#^#vitastor-0.8.5/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.8.5$(rpm --eval '%dist').tar.gz *

View File

@@ -35,7 +35,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.8.6.el7.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.8.5.el7.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.8.6
Version: 0.8.5
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.8.6.el7.tar.gz
Source0: vitastor-0.8.5.el7.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel

View File

@@ -35,7 +35,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.8.6.el8.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.8.5.el8.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.8.6
Version: 0.8.5
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.8.6.el8.tar.gz
Source0: vitastor-0.8.5.el8.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel

View File

@@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 2.8.12)
cmake_minimum_required(VERSION 2.8)
project(vitastor)
@@ -16,7 +16,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}")
endif()
add_definitions(-DVERSION="0.8.6")
add_definitions(-DVERSION="0.8.5")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -I ${CMAKE_SOURCE_DIR}/src)
if (${WITH_ASAN})
add_definitions(-fsanitize=address -fno-omit-frame-pointer)
@@ -111,7 +111,7 @@ target_compile_options(vitastor_common PUBLIC -fPIC)
add_executable(vitastor-osd
osd_main.cpp osd.cpp osd_secondary.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp
osd_primary.cpp osd_primary_chain.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp
osd_cluster.cpp osd_rmw.cpp
osd_cluster.cpp osd_rmw.cpp osd_scrub.cpp
)
target_link_libraries(vitastor-osd
vitastor_common
@@ -299,7 +299,7 @@ add_executable(test_cluster_client
EXCLUDE_FROM_ALL
test_cluster_client.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp
etcd_state_client.cpp timerfd_manager.cpp str_util.cpp ../json11/json11.cpp
)
target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__)
target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mock)

View File

@@ -122,11 +122,14 @@ Output:
Get a list of all objects in this Blockstore.
Input:
- oid.stripe = PG alignment
- len = PG count or 0 to list all objects
- offset = PG number
- oid.inode = min inode number or 0 to list all inodes
- version = max inode number or 0 to list all inodes
- pg_alignment = PG alignment
- pg_count = PG count or 0 to list all objects
- pg_number = PG number
- list_stable_limit = max number of clean objects in the reply
it's guaranteed that dirty objects are returned from the same interval,
i.e. from (min_oid .. min(max_oid, max(returned stable OIDs)))
- min_oid = min inode/stripe or 0 to list all objects
- max_oid = max inode/stripe or 0 to list all objects
Output:
- retval = total obj_ver_id count
@@ -143,10 +146,27 @@ struct blockstore_op_t
uint64_t opcode;
// finish callback
std::function<void (blockstore_op_t*)> callback;
object_id oid;
uint64_t version;
uint32_t offset;
uint32_t len;
union
{
// R/W
struct
{
object_id oid;
uint64_t version;
uint32_t offset;
uint32_t len;
};
// List
struct __attribute__((__packed__))
{
object_id min_oid;
object_id max_oid;
uint32_t pg_alignment;
uint32_t pg_count;
uint32_t pg_number;
uint32_t list_stable_limit;
};
};
void *buf;
void *bitmap;
int retval;

View File

@@ -445,11 +445,11 @@ void blockstore_impl_t::reshard_clean_db(pool_id_t pool, uint32_t pg_count, uint
void blockstore_impl_t::process_list(blockstore_op_t *op)
{
uint32_t list_pg = op->offset+1;
uint32_t pg_count = op->len;
uint64_t pg_stripe_size = op->oid.stripe;
uint64_t min_inode = op->oid.inode;
uint64_t max_inode = op->version;
uint32_t list_pg = op->pg_number+1;
uint32_t pg_count = op->pg_count;
uint64_t pg_stripe_size = op->pg_alignment;
uint64_t min_inode = op->min_oid.inode;
uint64_t max_inode = op->max_oid.inode;
// Check PG
if (pg_count != 0 && (pg_stripe_size < MIN_DATA_BLOCK_SIZE || list_pg > pg_count))
{
@@ -496,7 +496,13 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
stable_alloc += clean_db.size();
}
}
else
if (op->list_stable_limit > 0)
{
stable_alloc = op->list_stable_limit;
if (stable_alloc > 1024*1024)
stable_alloc = 1024*1024;
}
if (stable_alloc < 32768)
{
stable_alloc = 32768;
}
@@ -507,22 +513,21 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
FINISH_OP(op);
return;
}
auto max_oid = op->max_oid;
bool limited = false;
for (auto shard_it = clean_db_shards.lower_bound(first_shard);
shard_it != clean_db_shards.end() && shard_it->first <= last_shard;
shard_it++)
{
auto & clean_db = shard_it->second;
auto clean_it = clean_db.begin(), clean_end = clean_db.end();
if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode)
if (op->min_oid.inode != 0 || op->min_oid.stripe != 0)
{
clean_it = clean_db.lower_bound({
.inode = min_inode,
.stripe = 0,
});
clean_end = clean_db.upper_bound({
.inode = max_inode,
.stripe = UINT64_MAX,
});
clean_it = clean_db.lower_bound(op->min_oid);
}
if ((max_oid.inode != 0 || max_oid.stripe != 0) && !(max_oid < op->min_oid))
{
clean_end = clean_db.upper_bound(max_oid);
}
for (; clean_it != clean_end; clean_it++)
{
@@ -541,11 +546,24 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
.oid = clean_it->first,
.version = clean_it->second.version,
};
if (op->list_stable_limit > 0 && !limited && stable_count >= op->list_stable_limit)
{
limited = true;
break;
}
}
if (op->list_stable_limit > 0 && first_shard != last_shard)
{
// To maintain the order, we have to include objects in the same range from other shards
std::sort(stable, stable+stable_count);
if (stable_count > op->list_stable_limit)
stable_count = op->list_stable_limit;
max_oid = stable[stable_count-1].oid;
}
}
if (first_shard != last_shard)
if (op->list_stable_limit == 0 && first_shard != last_shard)
{
// If that's not a per-PG listing, sort clean entries
// If that's not a per-PG listing, sort clean entries (already sorted if list_stable_limit != 0)
std::sort(stable, stable+stable_count);
}
int clean_stable_count = stable_count;
@@ -554,20 +572,17 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
obj_ver_id *unstable = NULL;
{
auto dirty_it = dirty_db.begin(), dirty_end = dirty_db.end();
if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode)
if (op->min_oid.inode != 0 || op->min_oid.stripe != 0)
{
dirty_it = dirty_db.lower_bound({
.oid = {
.inode = min_inode,
.stripe = 0,
},
.oid = op->min_oid,
.version = 0,
});
}
if ((max_oid.inode != 0 || max_oid.stripe != 0) && !(max_oid < op->min_oid))
{
dirty_end = dirty_db.upper_bound({
.oid = {
.inode = max_inode,
.stripe = UINT64_MAX,
},
.oid = max_oid,
.version = UINT64_MAX,
});
}

View File

@@ -410,14 +410,17 @@ struct rm_osd_t
parent->cli->st_cli.etcd_prefix+"/pg/history/"+
std::to_string(pool_cfg.id)+"/"+std::to_string(pg_num)
);
auto hist = json11::Json::object {
{ "epoch", pg_cfg.epoch },
{ "all_peers", pg_cfg.all_peers },
{ "osd_sets", pg_cfg.target_history },
};
if (pg_cfg.scrub_ts)
hist["scrub_ts"] = pg_cfg.scrub_ts;
history_updates.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", history_key },
{ "value", base64_encode(json11::Json(json11::Json::object {
{ "epoch", pg_cfg.epoch },
{ "all_peers", pg_cfg.all_peers },
{ "osd_sets", pg_cfg.target_history },
}).dump()) },
{ "value", base64_encode(json11::Json(hist).dump()) },
} },
});
history_checks.push_back(json11::Json::object {

View File

@@ -281,7 +281,7 @@ void disk_tool_t::dump_journal_entry(int num, journal_entry *je, bool json)
if (je->big_write.size > sizeof(journal_entry_big_write))
{
printf(json ? ",\"bitmap\":\"" : " (bitmap: ");
for (int i = sizeof(journal_entry_big_write); i < je->big_write.size; i++)
for (int i = sizeof(journal_entry_big_write); i < je->small_write.size; i++)
{
printf("%02x", ((uint8_t*)je)[i]);
}

View File

@@ -26,7 +26,7 @@ int disk_tool_t::process_meta(std::function<void(blockstore_meta_header_v1_t *)>
buf_size = dsk.meta_len;
void *data = memalign_or_die(MEM_ALIGNMENT, buf_size);
lseek64(dsk.meta_fd, dsk.meta_offset, 0);
read_blocking(dsk.meta_fd, data, dsk.meta_block_size);
read_blocking(dsk.meta_fd, data, buf_size);
// Check superblock
blockstore_meta_header_v1_t *hdr = (blockstore_meta_header_v1_t *)data;
if (hdr->zero == 0 &&
@@ -41,11 +41,8 @@ int disk_tool_t::process_meta(std::function<void(blockstore_meta_header_v1_t *)>
if (buf_size % dsk.meta_block_size)
{
buf_size = 8*dsk.meta_block_size;
void *new_data = memalign_or_die(MEM_ALIGNMENT, buf_size);
memcpy(new_data, data, dsk.meta_block_size);
free(data);
data = new_data;
hdr = (blockstore_meta_header_v1_t *)data;
data = memalign_or_die(MEM_ALIGNMENT, buf_size);
}
}
dsk.bitmap_granularity = hdr->bitmap_granularity;

View File

@@ -7,8 +7,8 @@
#ifndef __MOCK__
#include "addr_util.h"
#include "http_client.h"
#include "str_util.h"
#endif
#include "str_util.h"
etcd_state_client_t::~etcd_state_client_t()
{
@@ -759,6 +759,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
fprintf(stderr, "Pool %u has invalid bitmap_granularity (must divide block_size), skipping pool\n", pool_id);
continue;
}
// Scrub Interval
pc.scrub_interval = parse_time(pool_item.second["scrub_interval"].string_value());
if (!pc.scrub_interval)
pc.scrub_interval = 0;
// Immediate Commit Mode
pc.immediate_commit = pool_item.second["immediate_commit"].is_string()
? (pool_item.second["immediate_commit"].string_value() == "all"
@@ -901,6 +905,8 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
}
// Read epoch
pg_cfg.epoch = value["epoch"].uint64_value();
// Scrub timestamp
pg_cfg.scrub_ts = parse_time(value["scrub_ts"].string_value());
if (on_change_pg_history_hook != NULL)
{
on_change_pg_history_hook(pool_id, pg_num);

View File

@@ -39,6 +39,7 @@ struct pg_config_t
osd_num_t cur_primary;
int cur_state;
uint64_t epoch;
uint64_t scrub_ts;
};
struct pool_config_t
@@ -55,6 +56,7 @@ struct pool_config_t
uint64_t max_osd_combinations;
uint64_t pg_stripe_size;
std::map<pg_num_t, pg_config_t> pg_config;
uint64_t scrub_interval;
};
struct inode_config_t

View File

@@ -157,10 +157,10 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->rdma_max_sge = 128;
this->rdma_max_send = config["rdma_max_send"].uint64_value();
if (!this->rdma_max_send)
this->rdma_max_send = 8;
this->rdma_max_send = 64;
this->rdma_max_recv = config["rdma_max_recv"].uint64_value();
if (!this->rdma_max_recv)
this->rdma_max_recv = 16;
this->rdma_max_recv = 128;
this->rdma_max_msg = config["rdma_max_msg"].uint64_value();
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
this->rdma_max_msg = 129*1024;

View File

@@ -353,10 +353,8 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
.wr_id = (uint64_t)(cl->peer_fd*2+1),
.sg_list = sge,
.num_sge = op_sge,
.opcode = cl->rdma_conn->avail_recv > 0 ? IBV_WR_SEND_WITH_IMM : IBV_WR_SEND,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
// Notify peer about our available incoming buffers
.imm_data = (uint32_t)cl->rdma_conn->avail_recv,
};
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
if (err || bad_wr)
@@ -365,27 +363,15 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
exit(1);
}
cl->rdma_conn->cur_send++;
cl->rdma_conn->avail_send--;
cl->rdma_conn->avail_recv = 0;
}
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
{
auto rc = cl->rdma_conn;
if (rc->cur_send >= rc->max_send)
if (!cl->send_list.size() || rc->cur_send >= rc->max_send)
{
return true;
}
if (!cl->send_list.size() || rc->use_flow_control && rc->avail_send <= 0)
{
if (rc->avail_recv)
{
// Only notify about available buffers so 2 peers don't lock each other
rc->send_sizes.push_back(0);
try_send_rdma_wr(cl, NULL, 0);
}
return true;
}
uint64_t op_size = 0, op_sge = 0;
ibv_sge sge[rc->max_sge];
while (rc->send_pos < cl->send_list.size())
@@ -445,7 +431,6 @@ static void try_recv_rdma_wr(osd_client_t *cl, void *buf)
exit(1);
}
cl->rdma_conn->cur_recv++;
cl->rdma_conn->avail_recv++;
}
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
@@ -507,11 +492,6 @@ void osd_messenger_t::handle_rdma_events()
if (!is_send)
{
rc->cur_recv--;
if ((wc[i].wc_flags & IBV_WC_WITH_IMM) && wc[i].imm_data > 0)
{
rc->avail_send += wc[i].imm_data;
rc->use_flow_control = true;
}
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
{
// handle_read_buffer may stop the client
@@ -519,7 +499,6 @@ void osd_messenger_t::handle_rdma_events()
}
try_recv_rdma_wr(cl, rc->recv_buffers[rc->next_recv_buf]);
rc->next_recv_buf = (rc->next_recv_buf+1) % rc->recv_buffers.size();
try_send_rdma(cl);
}
else
{

View File

@@ -45,8 +45,7 @@ struct msgr_rdma_connection_t
ibv_qp *qp = NULL;
msgr_rdma_address_t addr;
int max_send = 0, max_recv = 0, max_sge = 0;
int cur_send = 0, cur_recv = 0, avail_recv = 0, avail_send = 0;
bool use_flow_control = false;
int cur_send = 0, cur_recv = 0;
uint64_t max_msg = 0;
int send_pos = 0, send_buf_pos = 0;

View File

@@ -44,10 +44,9 @@ osd_t::osd_t(const json11::Json & config, ring_loop_t *ringloop)
// FIXME: Use timerfd_interval based directly on io_uring
this->tfd = epmgr->tfd;
if (!json_is_true(this->config["disable_blockstore"]))
auto bs_cfg = json_to_bs(this->config);
this->bs = new blockstore_t(bs_cfg, ringloop, tfd);
{
auto bs_cfg = json_to_bs(this->config);
this->bs = new blockstore_t(bs_cfg, ringloop, tfd);
// Autosync based on the number of unstable writes to prevent stalls due to insufficient journal space
uint64_t max_autosync = bs->get_journal_size() / bs->get_block_size() / 2;
if (autosync_writes > max_autosync)
@@ -94,8 +93,7 @@ osd_t::~osd_t()
{
ringloop->unregister_consumer(&consumer);
delete epmgr;
if (bs)
delete bs;
delete bs;
close(listen_fd);
free(zero_buffer);
}
@@ -180,6 +178,16 @@ void osd_t::parse_config(const json11::Json & config, bool allow_disk_params)
inode_vanish_time = config["inode_vanish_time"].uint64_value();
if (!inode_vanish_time)
inode_vanish_time = 60;
global_scrub_interval = config["scrub_interval"].uint64_value();
if (!global_scrub_interval)
global_scrub_interval = 30*86400;
scrub_queue_depth = config["scrub_queue_depth"].uint64_value();
if (scrub_queue_depth < 1 || scrub_queue_depth > MAX_RECOVERY_QUEUE)
scrub_queue_depth = 1;
scrub_sleep_ms = config["scrub_sleep"].uint64_value();
scrub_list_limit = config["scrub_list_limit"].uint64_value();
if (!scrub_list_limit)
scrub_list_limit = 1000;
}
void osd_t::bind_socket()
@@ -264,7 +272,8 @@ void osd_t::exec_op(osd_op_t *cur_op)
cur_op->req.hdr.opcode == OSD_OP_DELETE) &&
(cur_op->req.rw.len > OSD_RW_MAX ||
cur_op->req.rw.len % bs_bitmap_granularity ||
cur_op->req.rw.offset % bs_bitmap_granularity)))
cur_op->req.rw.offset % bs_bitmap_granularity)) ||
cur_op->req.hdr.opcode == OSD_OP_SCRUB && cur_op->peer_fd != -1)
{
// Bad command
finish_op(cur_op, -EINVAL);
@@ -281,6 +290,7 @@ void osd_t::exec_op(osd_op_t *cur_op)
cur_op->req.hdr.opcode != OSD_OP_SEC_LIST &&
cur_op->req.hdr.opcode != OSD_OP_READ &&
cur_op->req.hdr.opcode != OSD_OP_SEC_READ_BMP &&
cur_op->req.hdr.opcode != OSD_OP_SCRUB &&
cur_op->req.hdr.opcode != OSD_OP_SHOW_CONFIG)
{
// Readonly mode
@@ -311,6 +321,10 @@ void osd_t::exec_op(osd_op_t *cur_op)
{
continue_primary_del(cur_op);
}
else if (cur_op->req.hdr.opcode == OSD_OP_SCRUB)
{
continue_primary_scrub(cur_op);
}
else
{
exec_secondary(cur_op);
@@ -375,6 +389,10 @@ void osd_t::print_stats()
recovery_stat_bytes[1][i] = recovery_stat_bytes[0][i];
}
}
if (corrupted_objects > 0)
{
printf("[OSD %lu] %lu object(s) corrupted\n", osd_num, corrupted_objects);
}
if (incomplete_objects > 0)
{
printf("[OSD %lu] %lu object(s) incomplete\n", osd_num, incomplete_objects);
@@ -442,10 +460,11 @@ void osd_t::print_slow()
else if (op->req.hdr.opcode == OSD_OP_SEC_LIST)
{
bufprintf(
" inode=%lx-%lx pg=%u/%u, stripe=%lu",
op->req.sec_list.min_inode, op->req.sec_list.max_inode,
" oid=%lx/%lx-%lx/%lx pg=%u/%u, stripe=%lu, limit=%u",
op->req.sec_list.min_inode, op->req.sec_list.min_stripe,
op->req.sec_list.max_inode, op->req.sec_list.max_stripe,
op->req.sec_list.list_pg, op->req.sec_list.pg_count,
op->req.sec_list.pg_stripe_size
op->req.sec_list.pg_stripe_size, op->req.sec_list.stable_limit
);
}
else if (op->req.hdr.opcode == OSD_OP_READ || op->req.hdr.opcode == OSD_OP_WRITE ||
@@ -477,7 +496,7 @@ void osd_t::print_slow()
}
}
}
if (has_slow && bs)
if (has_slow)
{
bs->dump_diagnostics();
}

View File

@@ -28,6 +28,7 @@
#define OSD_PEERING_PGS 0x04
#define OSD_FLUSHING_PGS 0x08
#define OSD_RECOVERING 0x10
#define OSD_SCRUBBING 0x20
#define MAX_AUTOSYNC_INTERVAL 3600
#define DEFAULT_AUTOSYNC_INTERVAL 5
@@ -113,6 +114,10 @@ class osd_t
int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
int inode_vanish_time = 60;
int log_level = 0;
uint64_t global_scrub_interval = 30*86400;
uint64_t scrub_queue_depth = 1;
uint64_t scrub_sleep_ms = 0;
uint32_t scrub_list_limit = 1000;
// cluster state
@@ -134,15 +139,24 @@ class osd_t
std::set<pool_pg_num_t> dirty_pgs;
std::set<osd_num_t> dirty_osds;
int copies_to_delete_after_sync_count = 0;
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0, corrupted_objects = 0;
int peering_state = 0;
std::map<object_id, osd_recovery_op_t> recovery_ops;
std::map<object_id, osd_op_t*> scrub_ops;
bool recovery_last_degraded = true;
pool_pg_num_t recovery_last_pg;
object_id recovery_last_oid;
int recovery_pg_done = 0, recovery_done = 0;
osd_op_t *autosync_op = NULL;
// Scrubbing
uint64_t scrub_nearest_ts = 0;
int scrub_timer_id = -1;
pool_pg_num_t scrub_last_pg;
osd_op_t *scrub_list_op;
pg_list_result_t scrub_cur_list = {};
uint64_t scrub_list_pos = 0;
// Unstable writes
uint64_t unstable_write_count = 0;
std::map<osd_object_id_t, uint64_t> unstable_writes;
@@ -152,7 +166,7 @@ class osd_t
bool stopping = false;
int inflight_ops = 0;
blockstore_t *bs = NULL;
blockstore_t *bs;
void *zero_buffer = NULL;
uint64_t zero_buffer_size = 0;
uint32_t bs_block_size, bs_bitmap_granularity, clean_entry_bitmap_size;
@@ -220,6 +234,13 @@ class osd_t
bool continue_recovery();
pg_osd_set_state_t* change_osd_set(pg_osd_set_state_t *st, pg_t *pg);
// scrub
void scrub_list(pool_pg_num_t pg_id, osd_num_t role_osd, object_id min_oid);
bool pick_next_scrub(object_id & next_oid);
void submit_scrub_op(object_id oid);
bool continue_scrub();
void schedule_scrub(pg_t & pg);
// op execution
void exec_op(osd_op_t *cur_op);
void finish_op(osd_op_t *cur_op, int retval);
@@ -234,13 +255,15 @@ class osd_t
void autosync();
bool prepare_primary_rw(osd_op_t *cur_op);
void continue_primary_read(osd_op_t *cur_op);
void continue_primary_scrub(osd_op_t *cur_op);
void continue_primary_write(osd_op_t *cur_op);
void cancel_primary_write(osd_op_t *cur_op);
void continue_primary_sync(osd_op_t *cur_op);
void continue_primary_del(osd_op_t *cur_op);
bool check_write_queue(osd_op_t *cur_op, pg_t & pg);
void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg);
void free_object_state(pg_t & pg, pg_osd_set_state_t **object_state);
void remove_object_from_state(object_id & oid, pg_osd_set_state_t **object_state, pg_t &pg, bool report = true);
pg_osd_set_state_t *mark_object_corrupted(pg_t & pg, object_id oid, pg_osd_set_state_t *prev_object_state, osd_rmw_stripe_t *stripes, bool ref);
void deref_object_state(pg_t & pg, pg_osd_set_state_t **object_state, bool deref);
bool remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state);
void handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op);
void handle_primary_bs_subop(osd_op_t *subop);
@@ -255,10 +278,11 @@ class osd_t
int submit_primary_sync_subops(osd_op_t *cur_op);
void submit_primary_stab_subops(osd_op_t *cur_op);
uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state);
uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, pg_osd_set_state_t **object_state);
void continue_chained_read(osd_op_t *cur_op);
int submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op);
void check_corrupted_chained(pg_t & pg, osd_op_t *cur_op);
void send_chained_read_results(pg_t & pg, osd_op_t *cur_op);
std::vector<osd_chain_read_t> collect_chained_read_requests(osd_op_t *cur_op);
int collect_bitmap_requests(osd_op_t *cur_op, pg_t & pg, std::vector<bitmap_request_t> & bitmap_requests);

View File

@@ -182,10 +182,10 @@ json11::Json osd_t::get_statistics()
char time_str[50] = { 0 };
sprintf(time_str, "%ld.%03ld", ts.tv_sec, ts.tv_nsec/1000000);
st["time"] = time_str;
st["blockstore_ready"] = bs->is_started();
st["data_block_size"] = (uint64_t)bs->get_block_size();
if (bs)
{
st["blockstore_ready"] = bs->is_started();
st["data_block_size"] = (uint64_t)bs->get_block_size();
st["size"] = bs->get_block_count() * bs->get_block_size();
st["free"] = bs->get_free_block_count() * bs->get_block_size();
}
@@ -233,8 +233,7 @@ void osd_t::report_statistics()
json11::Json::object inode_space;
json11::Json::object last_stat;
pool_id_t last_pool = 0;
std::map<uint64_t, uint64_t> bs_empty_space;
auto & bs_inode_space = bs ? bs->get_inode_space_stats() : bs_empty_space;
auto & bs_inode_space = bs->get_inode_space_stats();
for (auto kv: bs_inode_space)
{
pool_id_t pool_id = INODE_POOL(kv.first);
@@ -337,6 +336,8 @@ void osd_t::report_statistics()
pg_stats["misplaced_count"] = pg.misplaced_objects.size();
pg_stats["degraded_count"] = pg.degraded_objects.size();
pg_stats["incomplete_count"] = pg.incomplete_objects.size();
if (pg.corrupted_count)
pg_stats["corrupted_count"] = pg.corrupted_count;
pg_stats["write_osd_set"] = pg.cur_set;
txn.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
@@ -691,6 +692,12 @@ void osd_t::apply_pg_config()
pg_it->second.all_peers == vec_all_peers)
{
// No change in osd_set and history
if (pg_it->second.scrub_ts != pg_cfg.scrub_ts)
{
pg_it->second.scrub_ts = pg_cfg.scrub_ts;
peering_state = peering_state | OSD_SCRUBBING;
ringloop->wakeup();
}
continue;
}
else
@@ -742,6 +749,7 @@ void osd_t::apply_pg_config()
.reported_epoch = pg_cfg.epoch,
.target_history = pg_cfg.target_history,
.all_peers = vec_all_peers,
.scrub_ts = pg_cfg.scrub_ts,
.target_set = pg_cfg.target_set,
};
if (pg.scheme == POOL_SCHEME_EC)
@@ -872,6 +880,8 @@ void osd_t::report_pg_states()
{ "all_peers", pg.all_peers },
{ "osd_sets", pg.target_history },
};
if (pg.scrub_ts)
history_value["scrub_ts"] = pg.scrub_ts;
checks.push_back(json11::Json::object {
{ "target", "MOD" },
{ "key", history_key },

View File

@@ -182,7 +182,9 @@ bool osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
op->bs_op = NULL;
delete op;
},
.len = (uint32_t)count,
{
.len = (uint32_t)count,
},
.buf = op->buf,
});
bs->enqueue_op(op->bs_op);
@@ -300,19 +302,17 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
if (osd_op->reply.hdr.retval < 0)
{
// Error recovering object
if (osd_op->reply.hdr.retval == -EPIPE)
{
// PG is stopped or one of the OSDs is gone, error is harmless
printf(
"Recovery operation failed with object %lx:%lx (PG %u/%u)\n",
op->oid.inode, op->oid.stripe, INODE_POOL(op->oid.inode),
map_to_pg(op->oid, st_cli.pool_config.at(INODE_POOL(op->oid.inode)).pg_stripe_size)
);
}
else
{
throw std::runtime_error("Failed to recover an object");
}
// EPIPE is totally harmless (peer is gone), others like EIO/EDOM may be not
printf(
"Recovery operation failed with object %lx:%lx (PG %u/%u): error %ld\n",
op->oid.inode, op->oid.stripe, INODE_POOL(op->oid.inode),
map_to_pg(op->oid, st_cli.pool_config.at(INODE_POOL(op->oid.inode)).pg_stripe_size),
osd_op->reply.hdr.retval
);
}
else if (log_level > 2)
{
printf("Recovery operation done for %lx:%lx\n", op->oid.inode, op->oid.stripe);
}
// CAREFUL! op = &recovery_ops[op->oid]. Don't access op->* after recovery_ops.erase()
op->osd_op = NULL;

View File

@@ -29,7 +29,8 @@
#define OSD_OP_DELETE 14
#define OSD_OP_PING 15
#define OSD_OP_SEC_READ_BMP 16
#define OSD_OP_MAX 16
#define OSD_OP_SCRUB 17
#define OSD_OP_MAX 17
#define OSD_RW_MAX 64*1024*1024
#define OSD_PROTOCOL_VERSION 1
@@ -173,6 +174,11 @@ struct __attribute__((__packed__)) osd_op_sec_list_t
uint64_t pg_stripe_size;
// inode range (used to select pools)
uint64_t min_inode, max_inode;
// min/max oid stripe, added after inodes for backwards compatibility
// also for backwards compatibility, max_stripe=UINT64_MAX means 0 and 0 means UINT64_MAX O_o
uint64_t min_stripe, max_stripe;
// max stable object count
uint32_t stable_limit;
};
struct __attribute__((__packed__)) osd_reply_sec_list_t

View File

@@ -24,6 +24,7 @@ void osd_t::handle_peers()
if (!p.second.peering_state->list_ops.size())
{
p.second.calc_object_states(log_level);
schedule_scrub(p.second);
report_pg_state(p.second);
incomplete_objects += p.second.incomplete_objects.size();
misplaced_objects += p.second.misplaced_objects.size();
@@ -83,6 +84,13 @@ void osd_t::handle_peers()
peering_state = peering_state & ~OSD_RECOVERING;
}
}
if (peering_state & OSD_SCRUBBING)
{
if (!continue_scrub())
{
peering_state = peering_state & ~OSD_SCRUBBING;
}
}
}
void osd_t::repeer_pgs(osd_num_t peer_osd)
@@ -128,9 +136,11 @@ void osd_t::reset_pg(pg_t & pg)
pg.state_dict.clear();
copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
pg.copies_to_delete_after_sync.clear();
corrupted_objects -= pg.corrupted_count;
incomplete_objects -= pg.incomplete_objects.size();
misplaced_objects -= pg.misplaced_objects.size();
degraded_objects -= pg.degraded_objects.size();
pg.corrupted_count = 0;
pg.incomplete_objects.clear();
pg.misplaced_objects.clear();
pg.degraded_objects.clear();
@@ -206,7 +216,7 @@ void osd_t::start_pg_peering(pg_t & pg)
pg.cur_loc_set.push_back({
.role = (uint64_t)role,
.osd_num = pg.cur_set[role],
.outdated = false,
.loc_bad = 0,
});
}
}
@@ -319,11 +329,12 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
op->bs_op = new blockstore_op_t();
op->bs_op->opcode = BS_OP_LIST;
op->bs_op->oid.stripe = st_cli.pool_config[ps->pool_id].pg_stripe_size;
op->bs_op->oid.inode = ((uint64_t)ps->pool_id << (64 - POOL_ID_BITS));
op->bs_op->version = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1;
op->bs_op->len = pg_counts[ps->pool_id];
op->bs_op->offset = ps->pg_num-1;
op->bs_op->pg_alignment = st_cli.pool_config[ps->pool_id].pg_stripe_size;
op->bs_op->min_oid.inode = ((uint64_t)ps->pool_id << (64 - POOL_ID_BITS));
op->bs_op->max_oid.inode = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1;
op->bs_op->max_oid.stripe = UINT64_MAX;
op->bs_op->pg_count = pg_counts[ps->pool_id];
op->bs_op->pg_number = ps->pg_num-1;
op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
{
if (op->bs_op->retval < 0)

View File

@@ -280,7 +280,7 @@ void pg_obj_state_check_t::finish_object()
osd_set.push_back((pg_obj_loc_t){
.role = (list[i].oid.stripe & STRIPE_MASK),
.osd_num = list[i].osd_num,
.outdated = false,
.loc_bad = 0,
});
}
}
@@ -302,7 +302,7 @@ void pg_obj_state_check_t::finish_object()
osd_set.push_back((pg_obj_loc_t){
.role = (list[i].oid.stripe & STRIPE_MASK),
.osd_num = list[i].osd_num,
.outdated = true,
.loc_bad = LOC_OUTDATED,
});
if (!(state & (OBJ_INCOMPLETE | OBJ_DEGRADED)))
{
@@ -322,67 +322,73 @@ void pg_obj_state_check_t::finish_object()
}
else
{
auto it = pg->state_dict.find(osd_set);
if (it == pg->state_dict.end())
{
std::vector<uint64_t> read_target;
if (replicated)
{
for (auto & o: osd_set)
{
if (!o.outdated)
{
read_target.push_back(o.osd_num);
}
}
while (read_target.size() < pg->pg_size)
{
// FIXME: This is because we then use .data() and assume it's at least <pg_size> long
read_target.push_back(0);
}
}
else
{
read_target.resize(pg->pg_size);
for (int i = 0; i < pg->pg_size; i++)
{
read_target[i] = 0;
}
for (auto & o: osd_set)
{
if (!o.outdated)
{
read_target[o.role] = o.osd_num;
}
}
}
pg->state_dict[osd_set] = {
.read_target = read_target,
.osd_set = osd_set,
.state = state,
.object_count = 1,
};
it = pg->state_dict.find(osd_set);
}
else
{
it->second.object_count++;
}
if (state & OBJ_INCOMPLETE)
{
pg->incomplete_objects[oid] = &it->second;
}
else if (state & OBJ_DEGRADED)
{
pg->degraded_objects[oid] = &it->second;
}
else
{
pg->misplaced_objects[oid] = &it->second;
}
pg->add_object_to_state(oid, state, osd_set);
}
}
pg_osd_set_state_t* pg_t::add_object_to_state(const object_id oid, const uint64_t state, const pg_osd_set_t & osd_set)
{
auto it = state_dict.find(osd_set);
if (it == state_dict.end())
{
std::vector<osd_num_t> read_target;
if (scheme == POOL_SCHEME_REPLICATED)
{
for (auto & o: osd_set)
{
if (!o.loc_bad)
{
read_target.push_back(o.osd_num);
}
}
while (read_target.size() < pg_size)
{
// FIXME: This is because we then use .data() and assume it's at least <pg_size> long
read_target.push_back(0);
}
}
else
{
read_target.resize(pg_size);
for (int i = 0; i < pg_size; i++)
{
read_target[i] = 0;
}
for (auto & o: osd_set)
{
if (!o.loc_bad)
{
read_target[o.role] = o.osd_num;
}
}
}
state_dict[osd_set] = {
.read_target = read_target,
.osd_set = osd_set,
.state = state,
.object_count = 1,
};
it = state_dict.find(osd_set);
}
else
{
it->second.object_count++;
}
if (state & OBJ_INCOMPLETE)
{
incomplete_objects[oid] = &it->second;
}
else if (state & OBJ_DEGRADED)
{
degraded_objects[oid] = &it->second;
}
else
{
misplaced_objects[oid] = &it->second;
}
return &it->second;
}
// FIXME: Write at least some tests for this function
void pg_t::calc_object_states(int log_level)
{
@@ -446,7 +452,8 @@ void pg_t::calc_object_states(int log_level)
osd_set_desc += (osd_set_desc == "" ? "" : ", ")+
std::to_string(loc.osd_num)+
(st.replicated ? "" : "("+std::to_string(loc.role)+")")+
(loc.outdated ? "(old)" : "");
(loc.loc_bad & LOC_OUTDATED ? "(old)" : "")+
(loc.loc_bad & LOC_CORRUPTED ? "(corrupted)" : "");
}
printf("[PG %u/%u] %lu objects on OSD set %s\n", pool_id, pg_num, stp.second.object_count, osd_set_desc.c_str());
}
@@ -456,7 +463,7 @@ void pg_t::calc_object_states(int log_level)
void pg_t::print_state()
{
printf(
"[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
"[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num,
(state & PG_STARTING) ? "starting" : "",
(state & PG_OFFLINE) ? "offline" : "",
(state & PG_PEERING) ? "peering" : "",
@@ -465,12 +472,14 @@ void pg_t::print_state()
(state & PG_REPEERING) ? "repeering" : "",
(state & PG_STOPPING) ? "stopping" : "",
(state & PG_DEGRADED) ? " + degraded" : "",
(state & PG_HAS_CORRUPTED) ? " + has_corrupted" : "",
(state & PG_HAS_INCOMPLETE) ? " + has_incomplete" : "",
(state & PG_HAS_DEGRADED) ? " + has_degraded" : "",
(state & PG_HAS_MISPLACED) ? " + has_misplaced" : "",
(state & PG_HAS_UNCLEAN) ? " + has_unclean" : "",
(state & PG_HAS_INVALID) ? " + has_invalid" : "",
(state & PG_LEFT_ON_DEAD) ? " + left_on_dead" : "",
(state & PG_SCRUBBING) ? " + scrubbing" : "",
total_count
);
}

View File

@@ -13,11 +13,14 @@
#define PG_EPOCH_BITS 48
#define LOC_OUTDATED 1
#define LOC_CORRUPTED 2
struct pg_obj_loc_t
{
uint64_t role;
osd_num_t osd_num;
bool outdated;
uint32_t loc_bad; // LOC_OUTDATED / LOC_CORRUPTED
};
typedef std::vector<pg_obj_loc_t> pg_osd_set_t;
@@ -30,6 +33,7 @@ struct pg_osd_set_state_t
pg_osd_set_t osd_set;
uint64_t state = 0;
uint64_t object_count = 0;
uint64_t ref_count = 0;
};
struct pg_list_result_t
@@ -91,6 +95,8 @@ struct pg_t
// target history and all potential peers
std::vector<std::vector<osd_num_t>> target_history;
std::vector<osd_num_t> all_peers;
// last scrub time
uint64_t scrub_ts = 0;
bool history_changed = false;
// peer list from the last peering event
std::vector<osd_num_t> cur_peers;
@@ -106,6 +112,7 @@ struct pg_t
// it may consume up to ~ (raw storage / object size) * 24 bytes in the worst case scenario
// which is up to ~192 MB per 1 TB in the worst case scenario
std::map<pg_osd_set_t, pg_osd_set_state_t> state_dict;
uint64_t corrupted_count;
btree::btree_map<object_id, pg_osd_set_state_t*> incomplete_objects, misplaced_objects, degraded_objects;
std::map<obj_piece_id_t, flush_action_t> flush_actions;
std::vector<obj_ver_osd_t> copies_to_delete_after_sync;
@@ -116,15 +123,16 @@ struct pg_t
int inflight = 0; // including write_queue
std::multimap<object_id, osd_op_t*> write_queue;
pg_osd_set_state_t* add_object_to_state(const object_id oid, const uint64_t state, const pg_osd_set_t & osd_set);
void calc_object_states(int log_level);
void print_state();
};
inline bool operator < (const pg_obj_loc_t &a, const pg_obj_loc_t &b)
{
return a.outdated < b.outdated ||
a.outdated == b.outdated && a.role < b.role ||
a.outdated == b.outdated && a.role == b.role && a.osd_num < b.osd_num;
return a.loc_bad < b.loc_bad ||
a.loc_bad == b.loc_bad && a.role < b.role ||
a.loc_bad == b.loc_bad && a.role == b.role && a.osd_num < b.osd_num;
}
inline bool operator == (const obj_piece_id_t & a, const obj_piece_id_t & b)

View File

@@ -52,7 +52,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
finish_op(cur_op, -EINVAL);
return false;
}
int stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size);
// Scrub is similar to r/w, so it's also handled here
int stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED
&& cur_op->req.hdr.opcode != OSD_OP_SCRUB ? 1 : pg_it->second.pg_size);
int chain_size = 0;
if (cur_op->req.hdr.opcode == OSD_OP_READ && cur_op->req.rw.meta_revision > 0)
{
@@ -90,6 +92,8 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
chain_size * (
// - copy of the chain
sizeof(inode_t) +
// - object states for every chain item
sizeof(void*) +
// - bitmap buffers for chained read
stripe_count * clean_entry_bitmap_size +
// - 'missing' flags for chained reads
@@ -117,6 +121,8 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
{
op_data->read_chain = (inode_t*)data_buf;
data_buf = (uint8_t*)data_buf + sizeof(inode_t) * chain_size;
op_data->chain_states = (pg_osd_set_state_t**)data_buf;
data_buf = (uint8_t*)data_buf + sizeof(pg_osd_set_state_t*) * chain_size;
op_data->snapshot_bitmaps = data_buf;
data_buf = (uint8_t*)data_buf + chain_size * stripe_count * clean_entry_bitmap_size;
op_data->missing_flags = (uint8_t*)data_buf;
@@ -131,6 +137,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
inode_it->second.parent_id != cur_op->req.rw.inode)
{
op_data->read_chain[chain_num++] = inode_it->second.parent_id;
op_data->chain_states[chain_num++] = NULL;
inode_it = st_cli.inode_config.find(inode_it->second.parent_id);
}
}
@@ -138,12 +145,12 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
return true;
}
uint64_t* osd_t::get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state)
uint64_t* osd_t::get_object_osd_set(pg_t &pg, object_id &oid, pg_osd_set_state_t **object_state)
{
if (!(pg.state & (PG_HAS_INCOMPLETE | PG_HAS_DEGRADED | PG_HAS_MISPLACED)))
{
*object_state = NULL;
return def;
return pg.cur_set.data();
}
auto st_it = pg.incomplete_objects.find(oid);
if (st_it != pg.incomplete_objects.end())
@@ -164,7 +171,7 @@ uint64_t* osd_t::get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_
return st_it->second->read_target.data();
}
*object_state = NULL;
return def;
return pg.cur_set.data();
}
void osd_t::continue_primary_read(osd_op_t *cur_op)
@@ -183,6 +190,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
goto resume_1;
else if (op_data->st == 2)
goto resume_2;
resume_0:
cur_op->reply.rw.bitmap_len = 0;
{
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
@@ -194,15 +202,17 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
// Determine version
auto vo_it = pg.ver_override.find(op_data->oid);
op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
op_data->prev_set = pg.cur_set.data();
if (pg.state != PG_ACTIVE)
{
// PG may be degraded or have misplaced objects
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
}
// PG may have degraded or misplaced objects
op_data->prev_set = get_object_osd_set(pg, op_data->oid, &op_data->object_state);
if (pg.state == PG_ACTIVE || op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Fast happy-path
if (op_data->scheme == POOL_SCHEME_REPLICATED &&
op_data->object_state && (op_data->object_state->state & OBJ_INCOMPLETE))
{
finish_op(cur_op, -EIO);
return;
}
cur_op->buf = alloc_read_buffer(op_data->stripes, op_data->pg_data_size, 0);
submit_primary_subops(SUBMIT_RMW_READ, op_data->target_ver, op_data->prev_set, cur_op);
op_data->st = 1;
@@ -228,6 +238,14 @@ resume_1:
resume_2:
if (op_data->errors > 0)
{
if (op_data->errcode == -EIO || op_data->errcode == -EDOM)
{
// I/O or checksum error
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
// FIXME: ref = true ideally... because new_state != state is not necessarily true if it's freed and recreated
op_data->object_state = mark_object_corrupted(pg, op_data->oid, op_data->object_state, op_data->stripes, false);
goto resume_0;
}
finish_op(cur_op, op_data->errcode);
return;
}
@@ -266,10 +284,144 @@ resume_2:
finish_op(cur_op, cur_op->req.rw.len);
}
// Decrement pg_osd_set_state_t's object_count and change PG state accordingly
void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t & pg)
pg_osd_set_state_t *osd_t::mark_object_corrupted(pg_t & pg, object_id oid, pg_osd_set_state_t *prev_object_state, osd_rmw_stripe_t *stripes, bool ref)
{
if (object_state->state & OBJ_INCOMPLETE)
pg_osd_set_state_t *object_state = NULL;
get_object_osd_set(pg, oid, &object_state);
if (prev_object_state != object_state)
{
// Object state changed in between by a parallel I/O operation, skip marking as failed
if (ref)
{
deref_object_state(pg, &prev_object_state, ref);
if (object_state)
object_state->ref_count++;
}
return object_state;
}
pg_osd_set_t corrupted_set;
if (object_state)
{
corrupted_set = object_state->osd_set;
}
else
{
for (int i = 0; i < pg.cur_set.size(); i++)
{
corrupted_set.push_back((pg_obj_loc_t){
.role = (pg.scheme == POOL_SCHEME_REPLICATED ? 0 : (uint64_t)i),
.osd_num = pg.cur_set[i],
});
}
}
// Mark object chunk(s) as corrupted
uint64_t has_roles = 0, n_roles = 0, n_copies = 0, n_corrupted = 0;
for (auto & chunk: corrupted_set)
{
bool corrupted = stripes[chunk.role].osd_num == chunk.osd_num && stripes[chunk.role].read_error;
if (corrupted && !(chunk.loc_bad & LOC_CORRUPTED))
n_corrupted++;
chunk.loc_bad = chunk.loc_bad | (corrupted ? LOC_CORRUPTED : 0);
if (!chunk.loc_bad)
{
if (pg.scheme == POOL_SCHEME_REPLICATED)
n_roles = 1;
else if (!(has_roles & (1 << chunk.role)))
{
n_roles++;
has_roles |= (1 << chunk.role);
}
n_copies++;
}
}
if (!n_corrupted)
{
// No chunks newly marked as corrupted - object is already marked or moved
return object_state;
}
int old_pg_state = pg.state;
if (object_state)
{
remove_object_from_state(oid, &object_state, pg, false);
deref_object_state(pg, &object_state, ref);
}
// Calculate object state
uint64_t obj_state = OBJ_CORRUPTED;
int pg_state_bits = PG_HAS_CORRUPTED;
this->corrupted_objects++;
pg.corrupted_count++;
if (log_level > 1)
{
printf("Marking object %lx:%lx corrupted: %lu chunks / %lu copies available, %lu corrupted\n",
oid.inode, oid.stripe, n_roles, n_copies, n_corrupted);
}
if (n_roles < pg.pg_data_size)
{
this->incomplete_objects++;
obj_state |= OBJ_INCOMPLETE;
pg_state_bits = PG_HAS_INCOMPLETE;
}
else if (n_roles < pg.pg_cursize)
{
this->degraded_objects++;
obj_state |= OBJ_DEGRADED;
pg_state_bits = PG_HAS_DEGRADED;
}
else
{
this->misplaced_objects++;
obj_state |= OBJ_MISPLACED;
pg_state_bits = PG_HAS_MISPLACED;
}
pg.state |= pg_state_bits;
if (pg.state != old_pg_state)
{
report_pg_state(pg);
if ((pg.state & (PG_HAS_DEGRADED | PG_HAS_MISPLACED)) !=
(old_pg_state & (PG_HAS_DEGRADED | PG_HAS_MISPLACED)))
{
peering_state = peering_state | OSD_RECOVERING;
if ((pg.state & PG_HAS_DEGRADED) != (old_pg_state & PG_HAS_DEGRADED))
{
// Restart recovery from degraded objects
recovery_last_degraded = true;
recovery_last_pg = {};
recovery_last_oid = {};
}
ringloop->wakeup();
}
}
// Insert object into the new state and retry
object_state = pg.add_object_to_state(oid, obj_state, corrupted_set);
if (ref)
object_state->ref_count++;
return object_state;
}
// Decrement pg_osd_set_state_t's object_count and change PG state accordingly
void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t **object_state, pg_t & pg, bool report)
{
if (!*object_state)
{
return;
}
pg_osd_set_state_t *recheck_state = NULL;
get_object_osd_set(pg, oid, &recheck_state);
if (recheck_state != *object_state)
{
recheck_state->ref_count++;
(*object_state)->ref_count--;
*object_state = recheck_state;
return;
}
(*object_state)->object_count--;
if ((*object_state)->state & OBJ_CORRUPTED)
{
this->corrupted_objects--;
pg.corrupted_count--;
}
bool changed = false;
if ((*object_state)->state & OBJ_INCOMPLETE)
{
// Successful write means that object is not incomplete anymore
this->incomplete_objects--;
@@ -277,41 +429,52 @@ void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object
if (!pg.incomplete_objects.size())
{
pg.state = pg.state & ~PG_HAS_INCOMPLETE;
report_pg_state(pg);
changed = true;
}
}
else if (object_state->state & OBJ_DEGRADED)
else if ((*object_state)->state & OBJ_DEGRADED)
{
this->degraded_objects--;
pg.degraded_objects.erase(oid);
if (!pg.degraded_objects.size())
{
pg.state = pg.state & ~PG_HAS_DEGRADED;
report_pg_state(pg);
changed = true;
}
}
else if (object_state->state & OBJ_MISPLACED)
else if ((*object_state)->state & OBJ_MISPLACED)
{
this->misplaced_objects--;
pg.misplaced_objects.erase(oid);
if (!pg.misplaced_objects.size())
{
pg.state = pg.state & ~PG_HAS_MISPLACED;
report_pg_state(pg);
changed = true;
}
}
else
{
throw std::runtime_error("BUG: Invalid object state: "+std::to_string(object_state->state));
throw std::runtime_error("BUG: Invalid object state: "+std::to_string((*object_state)->state));
}
if (changed && report)
{
report_pg_state(pg);
}
}
void osd_t::free_object_state(pg_t & pg, pg_osd_set_state_t **object_state)
void osd_t::deref_object_state(pg_t & pg, pg_osd_set_state_t **object_state, bool deref)
{
if (*object_state && !(--(*object_state)->object_count))
if (*object_state)
{
pg.state_dict.erase((*object_state)->osd_set);
*object_state = NULL;
if (deref)
{
(*object_state)->ref_count--;
}
if (!(*object_state)->object_count && !(*object_state)->ref_count)
{
pg.state_dict.erase((*object_state)->osd_set);
*object_state = NULL;
}
}
}
@@ -341,21 +504,28 @@ void osd_t::continue_primary_del(osd_op_t *cur_op)
}
resume_1:
// Determine which OSDs contain this object and delete it
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
op_data->prev_set = get_object_osd_set(pg, op_data->oid, &op_data->object_state);
if (op_data->object_state)
{
op_data->object_state->ref_count++;
}
// Submit 1 read to determine the actual version number
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
op_data->prev_set = NULL;
resume_2:
op_data->st = 2;
return;
resume_3:
if (op_data->errors > 0)
{
deref_object_state(pg, &op_data->object_state, true);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return;
}
// Check CAS version
if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1))
{
deref_object_state(pg, &op_data->object_state, true);
cur_op->reply.hdr.retval = -EINTR;
cur_op->reply.rw.version = op_data->fact_ver;
goto continue_others;
@@ -371,6 +541,7 @@ resume_4:
resume_5:
if (op_data->errors > 0)
{
deref_object_state(pg, &op_data->object_state, true);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return;
}
@@ -383,8 +554,8 @@ resume_5:
}
else
{
remove_object_from_state(op_data->oid, op_data->object_state, pg);
free_object_state(pg, &op_data->object_state);
remove_object_from_state(op_data->oid, &op_data->object_state, pg);
deref_object_state(pg, &op_data->object_state, true);
}
pg.total_count--;
cur_op->reply.hdr.retval = 0;

View File

@@ -9,6 +9,7 @@
#define SUBMIT_READ 0
#define SUBMIT_RMW_READ 1
#define SUBMIT_WRITE 2
#define SUBMIT_SCRUB_READ 3
struct unstable_osd_num_t
{
@@ -50,6 +51,7 @@ struct osd_primary_op_data_t
// for read_bitmaps
void *snapshot_bitmaps;
inode_t *read_chain;
pg_osd_set_state_t **chain_states;
uint8_t *missing_flags;
int chain_size;
osd_chain_read_t *chain_reads;

View File

@@ -40,10 +40,24 @@ resume_3:
resume_4:
if (op_data->errors > 0)
{
free(op_data->chain_reads);
op_data->chain_reads = NULL;
finish_op(cur_op, op_data->errcode);
return;
if (op_data->errcode == -EIO || op_data->errcode == -EDOM)
{
// Handle corrupted reads and retry...
check_corrupted_chained(pg, cur_op);
free(cur_op->buf);
cur_op->buf = NULL;
free(op_data->chain_reads);
op_data->chain_reads = NULL;
// FIXME: We can in theory retry only specific parts instead of the whole operation
goto resume_1;
}
else
{
free(op_data->chain_reads);
op_data->chain_reads = NULL;
finish_op(cur_op, op_data->errcode);
return;
}
}
send_chained_read_results(pg, cur_op);
finish_op(cur_op, cur_op->req.rw.len);
@@ -131,8 +145,7 @@ int osd_t::collect_bitmap_requests(osd_op_t *cur_op, pg_t & pg, std::vector<bitm
object_id cur_oid = { .inode = op_data->read_chain[chain_num], .stripe = op_data->oid.stripe };
auto vo_it = pg.ver_override.find(cur_oid);
uint64_t target_version = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
pg_osd_set_state_t *object_state;
uint64_t* cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
uint64_t* cur_set = get_object_osd_set(pg, cur_oid, &op_data->chain_states[chain_num]);
if (pg.scheme == POOL_SCHEME_REPLICATED)
{
osd_num_t read_target = 0;
@@ -247,6 +260,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
osd_op_t *subop = op_data->subops+subop_idx;
subop->op_type = OSD_OP_OUT;
// FIXME: Use the pre-allocated buffer
assert(!subop->buf);
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
subop->req = (osd_any_op_t){
.sec_read_bmp = {
@@ -375,6 +389,8 @@ int osd_t::submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op)
op_data->chain_read_count = chain_reads.size();
op_data->chain_reads = (osd_chain_read_t*)calloc_or_die(
1, sizeof(osd_chain_read_t) * chain_reads.size()
// FIXME: Allocate only <chain_reads.size()> instead of <chain_size> stripes
// (but it's slightly harder to handle in send_chained_read_results())
+ sizeof(osd_rmw_stripe_t) * stripe_count * op_data->chain_size
);
osd_rmw_stripe_t *chain_stripes = (osd_rmw_stripe_t*)(
@@ -403,8 +419,7 @@ int osd_t::submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op)
uint64_t *cur_set = pg.cur_set.data();
if (pg.state != PG_ACTIVE)
{
pg_osd_set_state_t *object_state;
cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
cur_set = get_object_osd_set(pg, cur_oid, &op_data->chain_states[chain_reads[cri].chain_pos]);
if (op_data->scheme != POOL_SCHEME_REPLICATED)
{
if (extend_missing_stripes(stripes, cur_set, pg.pg_data_size, pg.pg_size) < 0)
@@ -416,6 +431,17 @@ int osd_t::submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op)
}
op_data->degraded = 1;
}
else
{
auto cur_state = op_data->chain_states[chain_reads[cri].chain_pos];
if (cur_state && (cur_state->state & OBJ_INCOMPLETE))
{
free(op_data->chain_reads);
op_data->chain_reads = NULL;
finish_op(cur_op, -EIO);
return -1;
}
}
}
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
@@ -433,6 +459,7 @@ int osd_t::submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op)
}
}
}
assert(!cur_op->buf);
cur_op->buf = memalign_or_die(MEM_ALIGNMENT, read_buffer_size);
void *cur_buf = cur_op->buf;
for (int cri = 0; cri < chain_reads.size(); cri++)
@@ -468,12 +495,8 @@ int osd_t::submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op)
object_id cur_oid = { .inode = chain_reads[cri].inode, .stripe = op_data->oid.stripe };
auto vo_it = pg.ver_override.find(cur_oid);
uint64_t target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
uint64_t *cur_set = pg.cur_set.data();
if (pg.state != PG_ACTIVE)
{
pg_osd_set_state_t *object_state;
cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
}
auto cur_state = op_data->chain_states[chain_reads[cri].chain_pos];
uint64_t *cur_set = (pg.state != PG_ACTIVE && cur_state ? cur_state->read_target.data() : pg.cur_set.data());
int zero_read = -1;
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
@@ -487,6 +510,33 @@ int osd_t::submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op)
return 0;
}
void osd_t::check_corrupted_chained(pg_t & pg, osd_op_t *cur_op)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size);
osd_rmw_stripe_t *chain_stripes = (osd_rmw_stripe_t*)(
(uint8_t*)op_data->chain_reads + sizeof(osd_chain_read_t) * op_data->chain_read_count
);
for (int cri = 0; cri < op_data->chain_read_count; cri++)
{
object_id cur_oid = { .inode = op_data->chain_reads[cri].inode, .stripe = op_data->oid.stripe };
osd_rmw_stripe_t *stripes = chain_stripes + op_data->chain_reads[cri].chain_pos*stripe_count;
bool corrupted = false;
for (int i = 0; i < stripe_count; i++)
{
if (stripes[i].read_error)
{
corrupted = true;
break;
}
}
if (corrupted)
{
mark_object_corrupted(pg, cur_oid, op_data->chain_states[op_data->chain_reads[cri].chain_pos], stripes, false);
}
}
}
void osd_t::send_chained_read_results(pg_t & pg, osd_op_t *cur_op)
{
osd_primary_op_data_t *op_data = cur_op->op_data;

View File

@@ -9,6 +9,7 @@ void osd_t::autosync()
{
autosync_op = new osd_op_t();
autosync_op->op_type = OSD_OP_IN;
autosync_op->peer_fd = -1;
autosync_op->req = (osd_any_op_t){
.sync = {
.header = {
@@ -53,10 +54,7 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
inode_stats[cur_op->req.rw.inode].op_count[inode_st_op]++;
inode_stats[cur_op->req.rw.inode].op_sum[inode_st_op] += usec;
if (cur_op->req.hdr.opcode == OSD_OP_DELETE)
{
if (cur_op->op_data)
inode_stats[cur_op->req.rw.inode].op_bytes[inode_st_op] += cur_op->op_data->pg_data_size * bs_block_size;
}
inode_stats[cur_op->req.rw.inode].op_bytes[inode_st_op] += cur_op->op_data->pg_data_size * bs_block_size;
else
inode_stats[cur_op->req.rw.inode].op_bytes[inode_st_op] += cur_op->req.rw.len;
}
@@ -142,34 +140,40 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
for (int role = 0; role < op_data->pg_size; role++)
{
// We always submit zero-length writes to all replicas, even if the stripe is not modified
if (!(wr || !rep && stripes[role].read_end != 0 || zero_read == role))
if (!(wr || !rep && stripes[role].read_end != 0 || zero_read == role || submit_type == SUBMIT_SCRUB_READ))
{
continue;
}
osd_num_t role_osd_num = osd_set[role];
int stripe_num = rep ? 0 : role;
if (role_osd_num != 0)
{
int stripe_num = rep ? 0 : role;
osd_op_t *subop = op_data->subops + i;
stripes[stripe_num].osd_num = role_osd_num;
stripes[stripe_num].read_error = false;
subop->bitmap = stripes[stripe_num].bmp_buf;
subop->bitmap_len = clean_entry_bitmap_size;
// Using rmw_buf to pass pointer to stripes. Dirty but should work
subop->rmw_buf = stripes+stripe_num;
if (role_osd_num == this->osd_num)
{
clock_gettime(CLOCK_REALTIME, &subop->tv_begin);
subop->op_type = (uint64_t)cur_op;
subop->bitmap = stripes[stripe_num].bmp_buf;
subop->bitmap_len = clean_entry_bitmap_size;
subop->bs_op = new blockstore_op_t({
subop->bs_op = new blockstore_op_t((blockstore_op_t){
.opcode = (uint64_t)(wr ? (rep ? BS_OP_WRITE_STABLE : BS_OP_WRITE) : BS_OP_READ),
.callback = [subop, this](blockstore_op_t *bs_subop)
{
handle_primary_bs_subop(subop);
},
.oid = {
.inode = inode,
.stripe = op_data->oid.stripe | stripe_num,
{
.oid = (object_id){
.inode = inode,
.stripe = op_data->oid.stripe | stripe_num,
},
.version = op_version,
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
},
.version = op_version,
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
.buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
.bitmap = stripes[stripe_num].bmp_buf,
});
@@ -185,8 +189,6 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
else
{
subop->op_type = OSD_OP_OUT;
subop->bitmap = stripes[stripe_num].bmp_buf;
subop->bitmap_len = clean_entry_bitmap_size;
subop->req.sec_rw = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
@@ -243,6 +245,10 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
}
i++;
}
else
{
stripes[stripe_num].osd_num = 0;
}
}
return i-subop_idx;
}
@@ -332,9 +338,11 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE)
{
printf(
"%s subop to %lx:%lx v%lu failed on peer %d: retval = %d (expected %d)\n",
subop->peer_fd >= 0
? "%1$s subop to %2$lx:%3$lx v%4$lu failed on peer %7$d: retval = %5$d (expected %6$d)\n"
: "%1$s subop to %2$lx:%3$lx v%4$lu failed locally: retval = %5$d (expected %6$d)\n",
osd_op_names[opcode], subop->req.sec_rw.oid.inode, subop->req.sec_rw.oid.stripe, subop->req.sec_rw.version,
subop->peer_fd, retval, expected
retval, expected, subop->peer_fd
);
}
else
@@ -344,22 +352,32 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
osd_op_names[opcode], subop->peer_fd, retval, expected
);
}
// Error priority: EIO > ENOSPC > EPIPE
if (op_data->errcode == 0 || retval == -EIO ||
if (opcode == OSD_OP_SEC_READ && (retval == -EIO || retval == -EDOM))
{
// We'll retry reads from other replica(s) on EIO/EDOM and mark object as corrupted
((osd_rmw_stripe_t*)subop->rmw_buf)->read_error = true;
}
subop->rmw_buf = NULL;
// Error priority: EIO > EDOM > ENOSPC > EPIPE
if (op_data->errcode == 0 ||
retval == -EIO ||
retval == -EDOM && (op_data->errcode == -ENOSPC || op_data->errcode == -EPIPE) ||
retval == -ENOSPC && op_data->errcode == -EPIPE)
{
op_data->errcode = retval;
}
op_data->errors++;
if (subop->peer_fd >= 0 && (opcode != OSD_OP_SEC_WRITE && opcode != OSD_OP_SEC_WRITE_STABLE ||
retval != -ENOSPC))
if (subop->peer_fd >= 0 && retval != -EDOM &&
(retval != -ENOSPC || opcode != OSD_OP_SEC_WRITE && opcode != OSD_OP_SEC_WRITE_STABLE) &&
(retval != -EIO || opcode != OSD_OP_SEC_READ))
{
// Drop connection on any error expect ENOSPC
// Drop connection on unexpected errors
msgr.stop_client(subop->peer_fd);
}
}
else
{
subop->rmw_buf = NULL;
op_data->done++;
if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE)
{
@@ -403,6 +421,10 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
{
continue_primary_del(cur_op);
}
else if (cur_op->req.hdr.opcode == OSD_OP_SCRUB)
{
continue_primary_scrub(cur_op);
}
else
{
throw std::runtime_error("BUG: unknown opcode");
@@ -606,7 +628,9 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
{
handle_primary_bs_subop(subop);
},
.len = (uint32_t)stab_osd.len,
{
.len = (uint32_t)stab_osd.len,
},
.buf = (void*)(op_data->unstable_writes + stab_osd.start),
});
bs->enqueue_op(subops[i].bs_op);

View File

@@ -58,7 +58,13 @@ resume_1:
// Determine blocks to read and write
// Missing chunks are allowed to be overwritten even in incomplete objects
// FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
op_data->prev_set = get_object_osd_set(pg, op_data->oid, &op_data->object_state);
if (op_data->object_state)
{
// Protect object_state from being freed by a parallel read operation changing it
op_data->object_state->ref_count++;
}
retry_1:
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Simplified algorithm
@@ -68,6 +74,12 @@ resume_1:
if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
op_data->stripes[0].write_end != bs_block_size))
{
if (op_data->object_state->state & OBJ_INCOMPLETE)
{
// Refuse partial overwrite of an incomplete (corrupted) object
cur_op->reply.hdr.retval = -EIO;
goto continue_others;
}
// Object is degraded/misplaced and will be moved to <write_osd_set>
op_data->stripes[0].read_start = 0;
op_data->stripes[0].read_end = bs_block_size;
@@ -81,24 +93,66 @@ resume_1:
if (!cur_op->rmw_buf)
{
// Refuse partial overwrite of an incomplete object
cur_op->reply.hdr.retval = -EINVAL;
cur_op->reply.hdr.retval = -EIO;
goto continue_others;
}
}
// Read required blocks
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
{
if (op_data->object_state && (op_data->object_state->state & OBJ_INCOMPLETE))
{
// Allow to read version number (just version number!) from corrupted chunks
// to allow full overwrite of a corrupted object
bool found = false;
for (int role = 0; role < op_data->pg_size; role++)
{
if (op_data->prev_set[role] != 0 || op_data->stripes[role].read_end > op_data->stripes[role].read_start)
{
found = true;
break;
}
}
if (!found)
{
osd_num_t corrupted_target[op_data->pg_size];
for (int role = 0; role < op_data->pg_size; role++)
{
corrupted_target[role] = 0;
}
for (auto & loc: op_data->object_state->osd_set)
{
if (!(loc.loc_bad & LOC_OUTDATED) && !corrupted_target[loc.role])
{
corrupted_target[loc.role] = loc.osd_num;
}
}
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, corrupted_target, cur_op);
goto resume_2;
}
}
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
}
resume_2:
op_data->st = 2;
return;
resume_3:
if (op_data->errors > 0)
{
if (op_data->errcode == -EIO || op_data->errcode == -EDOM)
{
// Mark object corrupted and retry
op_data->object_state = mark_object_corrupted(pg, op_data->oid, op_data->object_state, op_data->stripes, true);
op_data->prev_set = op_data->object_state ? op_data->object_state->read_target.data() : pg.cur_set.data();
goto retry_1;
}
deref_object_state(pg, &op_data->object_state, true);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return;
}
// Check CAS version
if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1))
{
deref_object_state(pg, &op_data->object_state, true);
cur_op->reply.hdr.retval = -EINTR;
cur_op->reply.rw.version = op_data->fact_ver;
goto continue_others;
@@ -182,6 +236,7 @@ resume_10:
// Recheck PG state after reporting history - maybe it's already stopping/restarting
if (pg.state & (PG_STOPPING|PG_REPEERING))
{
deref_object_state(pg, &op_data->object_state, true);
pg_cancel_write_queue(pg, cur_op, op_data->oid, -EPIPE);
return;
}
@@ -197,6 +252,7 @@ resume_5:
}
if (op_data->errors > 0)
{
deref_object_state(pg, &op_data->object_state, true);
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->errcode);
return;
}
@@ -205,7 +261,7 @@ resume_5:
// We must forget the unclean state of the object before deleting it
// so the next reads don't accidentally read a deleted version
// And it should be done at the same time as the removal of the version override
remove_object_from_state(op_data->oid, op_data->object_state, pg);
remove_object_from_state(op_data->oid, &op_data->object_state, pg);
pg.clean_count++;
}
resume_6:
@@ -260,12 +316,12 @@ resume_7:
copies_to_delete_after_sync_count++;
}
}
free_object_state(pg, &op_data->object_state);
deref_object_state(pg, &op_data->object_state, true);
}
else
{
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
free_object_state(pg, &op_data->object_state);
deref_object_state(pg, &op_data->object_state, true);
if (op_data->n_subops > 0)
{
resume_8:

View File

@@ -25,7 +25,9 @@ struct osd_rmw_stripe_t
uint32_t req_start, req_end;
uint32_t read_start, read_end;
uint32_t write_start, write_end;
bool missing;
osd_num_t osd_num;
bool missing: 1;
bool read_error: 1;
};
// Here pg_minsize is the number of data chunks, not the minimum number of alive OSDs for the PG to operate

531
src/osd_scrub.cpp Normal file
View File

@@ -0,0 +1,531 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include "osd_primary.h"
#define SELF_FD -1
void osd_t::scrub_list(pool_pg_num_t pg_id, osd_num_t role_osd, object_id min_oid)
{
pool_id_t pool_id = pg_id.pool_id;
pg_num_t pg_num = pg_id.pg_num;
assert(!scrub_list_op);
if (role_osd == this->osd_num)
{
// Self
osd_op_t *op = new osd_op_t();
op->op_type = 0;
op->peer_fd = SELF_FD;
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
op->bs_op = new blockstore_op_t();
op->bs_op->opcode = BS_OP_LIST;
op->bs_op->pg_alignment = st_cli.pool_config[pool_id].pg_stripe_size;
if (min_oid.inode != 0 || min_oid.stripe != 0)
op->bs_op->min_oid = min_oid;
else
op->bs_op->min_oid.inode = ((uint64_t)pool_id << (64 - POOL_ID_BITS));
op->bs_op->max_oid.inode = ((uint64_t)(pool_id+1) << (64 - POOL_ID_BITS)) - 1;
op->bs_op->max_oid.stripe = UINT64_MAX;
op->bs_op->list_stable_limit = scrub_list_limit;
op->bs_op->pg_count = pg_counts[pool_id];
op->bs_op->pg_number = pg_num-1;
op->bs_op->callback = [this, op](blockstore_op_t *bs_op)
{
scrub_list_op = NULL;
if (op->bs_op->retval < 0)
{
printf("Local OP_LIST failed: retval=%d\n", op->bs_op->retval);
force_stop(1);
return;
}
add_bs_subop_stats(op);
scrub_cur_list = {
.buf = (obj_ver_id*)op->bs_op->buf,
.total_count = (uint64_t)op->bs_op->retval,
.stable_count = op->bs_op->version,
};
delete op->bs_op;
op->bs_op = NULL;
delete op;
continue_scrub();
};
scrub_list_op = op;
bs->enqueue_op(op->bs_op);
}
else
{
// Peer
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->peer_fd = msgr.osd_peer_fds.at(role_osd);
op->req = (osd_any_op_t){
.sec_list = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = msgr.next_subop_id++,
.opcode = OSD_OP_SEC_LIST,
},
.list_pg = pg_num,
.pg_count = pg_counts[pool_id],
.pg_stripe_size = st_cli.pool_config[pool_id].pg_stripe_size,
.min_inode = min_oid.inode ? min_oid.inode : ((uint64_t)(pool_id) << (64 - POOL_ID_BITS)),
.max_inode = ((uint64_t)(pool_id+1) << (64 - POOL_ID_BITS)) - 1,
.min_stripe = min_oid.stripe,
.stable_limit = scrub_list_limit,
},
};
op->callback = [this, role_osd](osd_op_t *op)
{
scrub_list_op = NULL;
if (op->reply.hdr.retval < 0)
{
printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval);
int fail_fd = op->peer_fd;
delete op;
msgr.stop_client(fail_fd);
return;
}
scrub_cur_list = {
.buf = (obj_ver_id*)op->buf,
.total_count = (uint64_t)op->reply.hdr.retval,
.stable_count = op->reply.sec_list.stable_count,
};
// set op->buf to NULL so it doesn't get freed
op->buf = NULL;
delete op;
continue_scrub();
};
scrub_list_op = op;
msgr.outbox_push(op);
}
}
bool osd_t::pick_next_scrub(object_id & next_oid)
{
if (!pgs.size())
{
if (scrub_cur_list.buf)
{
free(scrub_cur_list.buf);
scrub_cur_list = {};
scrub_last_pg = {};
}
return false;
}
timespec tv_now;
clock_gettime(CLOCK_REALTIME, &tv_now);
bool rescan = scrub_last_pg.pool_id != 0 || scrub_last_pg.pg_num != 0;
// Restart scanning from the same PG as the last time
auto pg_it = pgs.lower_bound(scrub_last_pg);
while (pg_it != pgs.end())
{
if (pg_it->second.state & PG_ACTIVE)
{
auto & pool_cfg = st_cli.pool_config.at(pg_it->first.pool_id);
auto interval = pool_cfg.scrub_interval ? pool_cfg.scrub_interval : global_scrub_interval;
if (pg_it->second.scrub_ts < tv_now.tv_sec-interval)
{
// Continue scrubbing from the next object
if (scrub_last_pg == pg_it->first)
{
while (scrub_list_pos < scrub_cur_list.total_count)
{
auto oid = scrub_cur_list.buf[scrub_list_pos].oid;
oid.stripe &= ~STRIPE_MASK;
scrub_list_pos++;
if (recovery_ops.find(oid) == recovery_ops.end() &&
scrub_ops.find(oid) == scrub_ops.end())
{
next_oid = oid;
if (!(pg_it->second.state & PG_SCRUBBING))
{
// Currently scrubbing this PG
pg_it->second.state = pg_it->second.state | PG_SCRUBBING;
report_pg_state(pg_it->second);
}
return true;
}
}
}
if (scrub_last_pg == pg_it->first &&
scrub_cur_list.total_count && scrub_list_pos >= scrub_cur_list.total_count &&
scrub_cur_list.stable_count < scrub_list_limit)
{
// End of the list, mark this PG as scrubbed and go to the next PG
}
else
{
// Continue listing
object_id scrub_last_oid;
if (scrub_last_pg != pg_it->first)
scrub_last_oid = (object_id){};
else if (scrub_cur_list.stable_count > 0)
{
scrub_last_oid = scrub_cur_list.buf[scrub_cur_list.stable_count-1].oid;
scrub_last_oid.stripe++;
}
osd_num_t scrub_osd = 0;
for (osd_num_t pg_osd: pg_it->second.cur_set)
{
if (pg_osd == this->osd_num || scrub_osd == 0)
scrub_osd = pg_osd;
}
if (!(pg_it->second.state & PG_SCRUBBING))
{
// Currently scrubbing this PG
pg_it->second.state = pg_it->second.state | PG_SCRUBBING;
report_pg_state(pg_it->second);
}
if (scrub_cur_list.buf)
{
free(scrub_cur_list.buf);
scrub_cur_list = {};
scrub_last_oid = {};
}
scrub_last_pg = pg_it->first;
scrub_list(pg_it->first, scrub_osd, scrub_last_oid);
return true;
}
}
if (pg_it->second.state & PG_SCRUBBING)
{
pg_it->second.scrub_ts = tv_now.tv_sec;
pg_it->second.state = pg_it->second.state & ~PG_SCRUBBING;
pg_it->second.history_changed = true;
report_pg_state(pg_it->second);
schedule_scrub(pg_it->second);
}
// The list is definitely not needed anymore
if (scrub_cur_list.buf)
{
free(scrub_cur_list.buf);
scrub_cur_list = {};
}
}
pg_it++;
if (pg_it == pgs.end() && rescan)
{
// Scan one more time to guarantee that there are no PGs to scrub
pg_it = pgs.begin();
rescan = false;
}
}
// Scanned all PGs - no more scrubs to do
return false;
}
void osd_t::submit_scrub_op(object_id oid)
{
auto osd_op = new osd_op_t();
osd_op->op_type = OSD_OP_OUT;
osd_op->req = (osd_any_op_t){
.rw = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = 1,
.opcode = OSD_OP_SCRUB,
},
.inode = oid.inode,
.offset = oid.stripe,
.len = 0,
},
};
if (log_level > 2)
{
printf("Submitting scrub for %lx:%lx\n", oid.inode, oid.stripe);
}
osd_op->callback = [this](osd_op_t *osd_op)
{
object_id oid = { .inode = osd_op->req.rw.inode, .stripe = osd_op->req.rw.offset };
if (osd_op->reply.hdr.retval < 0 && osd_op->reply.hdr.retval != -ENOENT)
{
// Scrub error
printf(
"Scrub failed with object %lx:%lx (PG %u/%u): error %ld\n",
oid.inode, oid.stripe, INODE_POOL(oid.inode),
map_to_pg(oid, st_cli.pool_config.at(INODE_POOL(oid.inode)).pg_stripe_size),
osd_op->reply.hdr.retval
);
}
else if (log_level > 2)
{
printf("Scrubbed %lx:%lx OK\n", oid.inode, oid.stripe);
}
delete osd_op;
if (scrub_sleep_ms)
{
this->tfd->set_timer(scrub_sleep_ms, false, [this, oid](int timer_id)
{
scrub_ops.erase(oid);
continue_scrub();
});
}
else
{
scrub_ops.erase(oid);
continue_scrub();
}
};
scrub_ops[oid] = osd_op;
exec_op(osd_op);
}
// Triggers scrub requests
// Scrub reads data from all replicas and compares it
// To scrub first we need to read objects listings
bool osd_t::continue_scrub()
{
if (scrub_list_op)
{
return true;
}
while (scrub_ops.size() < scrub_queue_depth)
{
object_id oid;
if (pick_next_scrub(oid))
submit_scrub_op(oid);
else
return false;
}
return true;
}
void osd_t::schedule_scrub(pg_t & pg)
{
auto & pool_cfg = st_cli.pool_config.at(pg.pool_id);
auto interval = pool_cfg.scrub_interval ? pool_cfg.scrub_interval : global_scrub_interval;
if (!scrub_nearest_ts || scrub_nearest_ts > pg.scrub_ts+interval)
{
scrub_nearest_ts = pg.scrub_ts+interval;
timespec tv_now;
clock_gettime(CLOCK_REALTIME, &tv_now);
if (scrub_timer_id >= 0)
{
tfd->clear_timer(scrub_timer_id);
scrub_timer_id = -1;
}
if (tv_now.tv_sec > scrub_nearest_ts)
{
scrub_nearest_ts = 0;
peering_state = peering_state | OSD_SCRUBBING;
ringloop->wakeup();
}
else
{
scrub_timer_id = tfd->set_timer((scrub_nearest_ts-tv_now.tv_sec)*1000, false, [this](int timer_id)
{
scrub_timer_id = -1;
scrub_nearest_ts = 0;
peering_state = peering_state | OSD_SCRUBBING;
ringloop->wakeup();
});
}
}
}
void osd_t::continue_primary_scrub(osd_op_t *cur_op)
{
if (!cur_op->op_data && !prepare_primary_rw(cur_op))
return;
osd_primary_op_data_t *op_data = cur_op->op_data;
if (op_data->st == 1)
goto resume_1;
else if (op_data->st == 2)
goto resume_2;
{
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
// Determine version
auto vo_it = pg.ver_override.find(op_data->oid);
op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
// PG may have degraded or misplaced objects
op_data->prev_set = get_object_osd_set(pg, op_data->oid, &op_data->object_state);
// Read all available chunks
int n_copies = 0;
op_data->degraded = false;
for (int role = 0; role < op_data->pg_size; role++)
{
op_data->stripes[role].read_start = 0;
op_data->stripes[role].read_end = bs_block_size;
if (op_data->prev_set[role] != 0)
{
n_copies++;
}
else if (op_data->scheme != POOL_SCHEME_REPLICATED && role < op_data->pg_data_size)
{
op_data->degraded = true;
}
}
if (n_copies <= op_data->pg_data_size)
{
// Nothing to compare, even if we'd like to
finish_op(cur_op, 0);
return;
}
cur_op->buf = alloc_read_buffer(op_data->stripes, op_data->pg_size,
op_data->scheme != POOL_SCHEME_REPLICATED ? bs_block_size*(op_data->pg_size-op_data->pg_data_size) : 0);
// Submit reads
osd_op_t *subops = new osd_op_t[n_copies];
op_data->fact_ver = 0;
op_data->done = op_data->errors = op_data->errcode = 0;
op_data->n_subops = n_copies;
op_data->subops = subops;
int sent = submit_primary_subop_batch(SUBMIT_SCRUB_READ, op_data->oid.inode, op_data->target_ver,
op_data->stripes, op_data->prev_set, cur_op, 0, -1);
assert(sent == n_copies);
op_data->st = 1;
}
resume_1:
return;
resume_2:
if (op_data->errors > 0)
{
if (op_data->errcode == -EIO || op_data->errcode == -EDOM)
{
// I/O or checksum error
int n_copies = 0;
for (int role = 0; role < op_data->pg_size; role++)
{
if (op_data->stripes[role].read_end != 0 &&
!op_data->stripes[role].read_error)
{
n_copies++;
}
}
if (n_copies <= op_data->pg_data_size)
{
// Nothing to compare, just mark the object as corrupted
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
// FIXME: ref = true ideally... because new_state != state is not necessarily true if it's freed and recreated
op_data->object_state = mark_object_corrupted(pg, op_data->oid, op_data->object_state, op_data->stripes, false);
// Operation is treated as unsuccessful only if the object becomes unreadable
finish_op(cur_op, n_copies < op_data->pg_data_size ? op_data->errcode : 0);
return;
}
// Proceed, we can still compare chunks that were successfully read
}
else
{
finish_op(cur_op, op_data->errcode);
return;
}
}
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Check that all chunks have returned the same data
int total = 0;
int eq_to[op_data->pg_size];
for (int role = 0; role < op_data->pg_size; role++)
{
eq_to[role] = -1;
if (op_data->stripes[role].read_end != 0 && !op_data->stripes[role].read_error)
{
total++;
eq_to[role] = role;
for (int other = 0; other < role; other++)
{
// Only compare with unique chunks (eq_to[other] == other)
if (eq_to[other] == other && memcmp(op_data->stripes[role].read_buf, op_data->stripes[other].read_buf, bs_block_size) == 0)
{
eq_to[role] = eq_to[other];
break;
}
}
}
}
int votes[op_data->pg_size];
for (int role = 0; role < op_data->pg_size; role++)
votes[role] = 0;
for (int role = 0; role < op_data->pg_size; role++)
{
if (eq_to[role] != -1)
votes[eq_to[role]]++;
}
int best = -1;
for (int role = 0; role < op_data->pg_size; role++)
{
if (best < 0 && votes[role] > 0 || votes[role] > votes[best])
best = role;
}
if (best > 0 && votes[best] < total)
{
// FIXME Add a flag to allow to skip such objects and not recover them automatically
bool unknown = false;
for (int role = 0; role < op_data->pg_size; role++)
{
if (role != best && votes[role] == votes[best])
unknown = true;
if (votes[role] > 0 && votes[role] < votes[best])
{
printf(
"[PG %u/%u] Object %lx:%lx copy on OSD %lu doesn't match %d other copies, marking it as corrupted\n",
INODE_POOL(op_data->oid.inode), op_data->pg_num,
op_data->oid.inode, op_data->oid.stripe, op_data->stripes[role].osd_num, votes[best]
);
op_data->stripes[role].read_error = true;
}
}
if (unknown)
{
// It's unknown which replica is good. There are multiple versions with no majority
best = -1;
}
}
}
else
{
assert(op_data->scheme == POOL_SCHEME_EC || op_data->scheme == POOL_SCHEME_XOR);
if (op_data->degraded)
{
// Reconstruct missing stripes
// XOR shouldn't come here as it only has 1 parity chunk
assert(op_data->scheme == POOL_SCHEME_EC);
reconstruct_stripes_ec(op_data->stripes, op_data->pg_size, op_data->pg_data_size, clean_entry_bitmap_size);
}
// Generate parity chunks and compare them with actual data
osd_num_t fake_osd_set[op_data->pg_size];
for (int i = 0; i < op_data->pg_size; i++)
{
fake_osd_set[i] = 1;
op_data->stripes[i].write_buf = i >= op_data->pg_data_size
? ((uint8_t*)cur_op->buf + (i-op_data->pg_data_size)*bs_block_size)
: op_data->stripes[i].read_buf;
}
if (op_data->scheme == POOL_SCHEME_XOR)
{
calc_rmw_parity_xor(op_data->stripes, op_data->pg_size, fake_osd_set, fake_osd_set, bs_block_size, clean_entry_bitmap_size);
}
else if (op_data->scheme == POOL_SCHEME_EC)
{
calc_rmw_parity_ec(op_data->stripes, op_data->pg_size, op_data->pg_data_size, fake_osd_set, fake_osd_set, bs_block_size, clean_entry_bitmap_size);
}
// Now compare that write_buf == read_buf
for (int role = op_data->pg_data_size; role < op_data->pg_size; role++)
{
if (op_data->stripes[role].osd_num != 0 && !op_data->stripes[role].read_error &&
memcmp(op_data->stripes[role].read_buf, op_data->stripes[role].write_buf, bs_block_size) != 0)
{
// Chunks don't match - something's wrong... but we don't know what :D
// FIXME: Try to locate errors (may be possible with >= 2 parity chunks)
printf(
"[PG %u/%u] Object %lx:%lx parity chunk %d on OSD %lu doesn't match data, marking it as corrupted\n",
INODE_POOL(op_data->oid.inode), op_data->pg_num,
op_data->oid.inode, op_data->oid.stripe,
role-op_data->pg_data_size, op_data->stripes[role].osd_num
);
op_data->stripes[role].read_error = true;
}
}
}
for (int role = 0; role < op_data->pg_size; role++)
{
if (op_data->stripes[role].osd_num != 0 && !op_data->stripes[role].read_error)
{
// Got at least 1 read error or mismatch, mark the object as corrupted
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
// FIXME: ref = true ideally... because new_state != state is not necessarily true if it's freed and recreated
op_data->object_state = mark_object_corrupted(pg, op_data->oid, op_data->object_state, op_data->stripes, false);
break;
}
}
finish_op(cur_op, 0);
}

View File

@@ -125,11 +125,18 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
secondary_op_callback(cur_op);
return;
}
cur_op->bs_op->oid.stripe = cur_op->req.sec_list.pg_stripe_size;
cur_op->bs_op->len = cur_op->req.sec_list.pg_count;
cur_op->bs_op->offset = cur_op->req.sec_list.list_pg - 1;
cur_op->bs_op->oid.inode = cur_op->req.sec_list.min_inode;
cur_op->bs_op->version = cur_op->req.sec_list.max_inode;
cur_op->bs_op->pg_alignment = cur_op->req.sec_list.pg_stripe_size;
cur_op->bs_op->pg_count = cur_op->req.sec_list.pg_count;
cur_op->bs_op->pg_number = cur_op->req.sec_list.list_pg - 1;
cur_op->bs_op->min_oid.inode = cur_op->req.sec_list.min_inode;
cur_op->bs_op->min_oid.stripe = cur_op->req.sec_list.min_stripe;
cur_op->bs_op->max_oid.inode = cur_op->req.sec_list.max_inode;
if (cur_op->req.sec_list.max_inode && cur_op->req.sec_list.max_stripe != UINT64_MAX)
{
cur_op->bs_op->max_oid.stripe = cur_op->req.sec_list.max_stripe
? cur_op->req.sec_list.max_stripe : UINT64_MAX;
}
cur_op->bs_op->list_stable_limit = cur_op->req.sec_list.stable_limit;
#ifdef OSD_STUB
cur_op->bs_op->retval = 0;
cur_op->bs_op->buf = NULL;

View File

@@ -150,7 +150,6 @@ int connect_osd(const char *osd_address, int osd_port)
if (connect(connect_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
{
perror("connect");
close(connect_fd);
return -1;
}
int one = 1;

View File

@@ -3,9 +3,9 @@
#include "pg_states.h"
const int pg_state_bit_count = 14;
const int pg_state_bit_count = 16;
const int pg_state_bits[14] = {
const int pg_state_bits[16] = {
PG_STARTING,
PG_PEERING,
PG_INCOMPLETE,
@@ -14,15 +14,17 @@ const int pg_state_bits[14] = {
PG_STOPPING,
PG_OFFLINE,
PG_DEGRADED,
PG_HAS_CORRUPTED,
PG_HAS_INCOMPLETE,
PG_HAS_DEGRADED,
PG_HAS_MISPLACED,
PG_HAS_UNCLEAN,
PG_HAS_INVALID,
PG_LEFT_ON_DEAD,
PG_SCRUBBING,
};
const char *pg_state_names[14] = {
const char *pg_state_names[16] = {
"starting",
"peering",
"incomplete",
@@ -31,10 +33,12 @@ const char *pg_state_names[14] = {
"stopping",
"offline",
"degraded",
"has_corrupted",
"has_incomplete",
"has_degraded",
"has_misplaced",
"has_unclean",
"has_invalid",
"left_on_dead",
"scrubbing",
};

View File

@@ -22,7 +22,9 @@
#define PG_HAS_MISPLACED (1<<10)
#define PG_HAS_UNCLEAN (1<<11)
#define PG_HAS_INVALID (1<<12)
#define PG_LEFT_ON_DEAD (1<<13)
#define PG_HAS_CORRUPTED (1<<13)
#define PG_LEFT_ON_DEAD (1<<14)
#define PG_SCRUBBING (1<<15)
// Lower bits that represent object role (EC 0/1/2... or always 0 with replication)
// 12 bits is a safe default that doesn't depend on pg_stripe_size or pg_block_size
@@ -32,6 +34,8 @@
#define OBJ_DEGRADED 0x02
#define OBJ_INCOMPLETE 0x04
#define OBJ_MISPLACED 0x08
// OBJ_CORRUPTED is always set with one of OBJ_INCOMPLETE/OBJ_DEGRADED/OBJ_MISPLACED
#define OBJ_CORRUPTED 0x10
#define OBJ_NEEDS_STABLE 0x10000
#define OBJ_NEEDS_ROLLBACK 0x20000

View File

@@ -15,7 +15,7 @@ int read_blocking(int fd, void *read_buf, size_t remaining)
size_t done = 0;
while (done < remaining)
{
ssize_t r = read(fd, read_buf, remaining-done);
size_t r = read(fd, read_buf, remaining-done);
if (r <= 0)
{
if (!errno)
@@ -41,7 +41,7 @@ int write_blocking(int fd, void *write_buf, size_t remaining)
size_t done = 0;
while (done < remaining)
{
ssize_t r = write(fd, write_buf, remaining-done);
size_t r = write(fd, write_buf, remaining-done);
if (r < 0)
{
if (errno != EINTR && errno != EAGAIN && errno != EPIPE)

View File

@@ -249,3 +249,35 @@ void print_help(const char *help_text, std::string exe_name, std::string cmd, bo
fwrite(filtered_text.data(), filtered_text.size(), 1, stdout);
exit(0);
}
uint64_t parse_time(std::string time_str, bool *ok)
{
if (!time_str.length())
{
if (ok)
*ok = false;
return 0;
}
uint64_t mul = 1;
char type_char = tolower(time_str[time_str.length()-1]);
if (type_char == 's' || type_char == 'm' || type_char == 'h' || type_char == 'd' || type_char == 'y')
{
if (type_char == 's')
mul = 1;
else if (time_str[time_str.length()-1] == 'M')
mul = 30*86400;
else if (type_char == 'm')
mul = 60;
else if (type_char == 'h')
mul = 3600;
else if (type_char == 'd')
mul = 86400;
else /*if (type_char == 'y')*/
mul = 86400*365;
time_str = time_str.substr(0, time_str.length()-1);
}
uint64_t ts = stoull_full(time_str, 0) * mul;
if (ok)
*ok = !(ts == 0 && time_str != "0" && (time_str != "" || mul != 1));
return ts;
}

View File

@@ -15,3 +15,4 @@ std::string str_replace(const std::string & in, const std::string & needle, cons
uint64_t stoull_full(const std::string & str, int base = 0);
std::string format_size(uint64_t size, bool nobytes = false);
void print_help(const char *help_text, std::string exe_name, std::string cmd, bool all);
uint64_t parse_time(std::string time_str, bool *ok = NULL);

View File

@@ -83,7 +83,6 @@ int connect_stub(const char *server_address, int server_port)
if (connect(connect_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
{
perror("connect");
close(connect_fd);
return -1;
}
int one = 1;

View File

@@ -6,7 +6,7 @@ includedir=${prefix}/@CMAKE_INSTALL_INCLUDEDIR@
Name: Vitastor
Description: Vitastor client library
Version: 0.8.6
Version: 0.8.5
Libs: -L${libdir} -lvitastor_client
Cflags: -I${includedir}