Compare commits

..

23 Commits

Author SHA1 Message Date
85298ddae2 Release 0.6.15
- Make peering much faster in medium to large clusters
- Fix a reenterability issue which could rarely lead to peering process hangs
2022-03-06 19:16:34 +03:00
e23296a327 Rename cli_rm -> cli_rm_data, cli_snap_rm -> cli_rm 2022-02-24 14:34:14 +03:00
839ec9e6e0 Shard clean_db by PGs to speedup listings 2022-02-20 00:21:24 +03:00
7cbfdff41a Replace some throws with force_stop 2022-02-20 00:21:19 +03:00
951272f27f Try to process PG one after another 2022-02-19 19:25:55 +03:00
a3fb1d4c98 Fix reenterability around set_timer 2022-02-19 18:28:12 +03:00
88402e6eb6 Move next_request to run_cb_and_clear 2022-02-19 16:59:03 +03:00
390239c51b Don't terminate HTTP requests with timeouts if response is already available in the socket 2022-02-19 13:37:12 +03:00
b7b2adfa32 Fix http client not continuing requests in case of failure to connect 2022-02-19 13:36:26 +03:00
36c276358b Attempt to fix "head-of-line blocking" by LIST operations 2022-02-18 01:31:45 +03:00
117d6f0612 Release 0.6.14
- Fix IPv6 address parsing
- Fix "cannot read bytes of undefined" in the monitor on a fresh DB
- Fix possible hangs of read requests on OSD restarts without immediate_commit=all mode
- Fix OSDs skipping misplaced recovery in some cases
- Fix OSDs possibly dying with "map::at" errors when other OSDs are stopped
- Fix division by zero in ls if all pool OSDs are down
2022-02-17 14:43:44 +03:00
7d79c58095 Use the larger sockaddr_storage structure 2022-02-12 11:22:56 +03:00
46d2bc100f Add some tolerance to stat calculation so it does not fail on a fresh DB 2022-02-11 16:37:16 +03:00
732e2804e9 Fix operation dependency counter underflow for reads without immediate_commit=all mode 2022-02-11 10:54:11 +03:00
abaec2008c Fix OSDs missing misplaced recovery 2022-02-11 01:00:24 +03:00
8129d238a4 Different fio versions have different types for xfer_buflen, but Vitastor anyway does not support 128-bit offsets 2022-02-10 01:21:04 +03:00
61ebed144a Fix OSDs possibly dying with "map::at" errors when other OSDs are stopped 2022-02-09 10:35:29 +03:00
9d3ba113aa Extract bind socket code into a utility function 2022-02-06 00:39:52 +03:00
9788045dc9 Fix division by zero in ls if all pool OSDs are down 2022-02-05 17:03:37 +03:00
d6b0d29af6 4k MEM_ALIGNMENT 2022-02-05 17:03:37 +03:00
36f352f06f Release 0.6.13
- Fix client hangs possible on OSD restarts (bug affected versions from 0.5.11)
- Fix "Assertion `sqe != NULL' failed" io_uring-related crashes possible
  on some kernels (0.6.11 increased probability of this bug)
- Fix timeout=0 in NBD proxy
- Fix build under centos 7
2022-02-03 01:50:30 +03:00
318cc463c2 Fix warnings 2022-02-03 01:50:30 +03:00
145e5cfb86 MCL_ONFAULT is not available under centos 7 2022-02-03 01:42:19 +03:00
53 changed files with 1208 additions and 1049 deletions

View File

@@ -2,6 +2,6 @@ cmake_minimum_required(VERSION 2.8)
project(vitastor) project(vitastor)
set(VERSION "0.6.12") set(VERSION "0.6.15")
add_subdirectory(src) add_subdirectory(src)

View File

@@ -1,4 +1,4 @@
VERSION ?= v0.6.12 VERSION ?= v0.6.15
all: build push all: build push

View File

@@ -49,7 +49,7 @@ spec:
capabilities: capabilities:
add: ["SYS_ADMIN"] add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true allowPrivilegeEscalation: true
image: vitalif/vitastor-csi:v0.6.12 image: vitalif/vitastor-csi:v0.6.15
args: args:
- "--node=$(NODE_ID)" - "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)" - "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -116,7 +116,7 @@ spec:
privileged: true privileged: true
capabilities: capabilities:
add: ["SYS_ADMIN"] add: ["SYS_ADMIN"]
image: vitalif/vitastor-csi:v0.6.12 image: vitalif/vitastor-csi:v0.6.15
args: args:
- "--node=$(NODE_ID)" - "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)" - "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -5,7 +5,7 @@ package vitastor
const ( const (
vitastorCSIDriverName = "csi.vitastor.io" vitastorCSIDriverName = "csi.vitastor.io"
vitastorCSIDriverVersion = "0.6.12" vitastorCSIDriverVersion = "0.6.15"
) )
// Config struct fills the parameters of request or user input // Config struct fills the parameters of request or user input

2
debian/changelog vendored
View File

@@ -1,4 +1,4 @@
vitastor (0.6.12-1) unstable; urgency=medium vitastor (0.6.15-1) unstable; urgency=medium
* RDMA support * RDMA support
* Bugfixes * Bugfixes

View File

@@ -33,8 +33,8 @@ RUN set -e -x; \
mkdir -p /root/packages/vitastor-$REL; \ mkdir -p /root/packages/vitastor-$REL; \
rm -rf /root/packages/vitastor-$REL/*; \ rm -rf /root/packages/vitastor-$REL/*; \
cd /root/packages/vitastor-$REL; \ cd /root/packages/vitastor-$REL; \
cp -r /root/vitastor vitastor-0.6.12; \ cp -r /root/vitastor vitastor-0.6.15; \
cd vitastor-0.6.12; \ cd vitastor-0.6.15; \
ln -s /root/fio-build/fio-*/ ./fio; \ ln -s /root/fio-build/fio-*/ ./fio; \
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
ls /usr/include/linux/raw.h || cp ./debian/raw.h /usr/include/linux/raw.h; \ ls /usr/include/linux/raw.h || cp ./debian/raw.h /usr/include/linux/raw.h; \
@@ -47,8 +47,8 @@ RUN set -e -x; \
rm -rf a b; \ rm -rf a b; \
echo "dep:fio=$FIO" > debian/fio_version; \ echo "dep:fio=$FIO" > debian/fio_version; \
cd /root/packages/vitastor-$REL; \ cd /root/packages/vitastor-$REL; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.12.orig.tar.xz vitastor-0.6.12; \ tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.15.orig.tar.xz vitastor-0.6.15; \
cd vitastor-0.6.12; \ cd vitastor-0.6.15; \
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \ DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \ DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \

View File

@@ -48,28 +48,19 @@
type: string type: string
info: | info: |
RDMA device name to use for Vitastor OSD communications (for example, RDMA device name to use for Vitastor OSD communications (for example,
"rocep5s0f0"). Please note that if your RDMA device doesn't support "rocep5s0f0"). Please note that Vitastor RDMA requires Implicit On-Demand
Implicit ODP (Implicit On-Demand Paging) then all Vitastor OSDs and clients Paging (Implicit ODP) and Scatter/Gather (SG) support from the RDMA device
will have to use mlockall() to lock all application memory to use RDMA. to work. For example, Mellanox ConnectX-3 and older adapters don't have
In case of the native Vitastor QEMU driver with RDMA, all virtual machine Implicit ODP, so they're unsupported by Vitastor. Run `ibv_devinfo -v` as
memory will be locked if your RDMA device doesn't support Implicit ODP. root to list available RDMA devices and their features.
Notably, Mellanox ConnectX-3 and older adapters don't support Implicit ODP,
while ConnectX-4 and newer do. Run `ibv_devinfo -v` as root to list
available RDMA devices and their features.
info_ru: | info_ru: |
Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0"). Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0").
Имейте в виду, что если ваше устройство не поддерживает Implicit ODP Имейте в виду, что поддержка RDMA в Vitastor требует функций устройства
(Implicit On-Demand Paging), то все OSD и клиенты Vitastor будут вынуждены Implicit On-Demand Paging (Implicit ODP) и Scatter/Gather (SG). Например,
блокировать всю память приложения с помощью mlockall(), чтобы задействовать адаптеры Mellanox ConnectX-3 и более старые не поддерживают Implicit ODP и
RDMA. В случае нативного QEMU-драйвера это будет означать, что при потому не поддерживаются в Vitastor. Запустите `ibv_devinfo -v` от имени
использовании RDMA на устройстве без поддержки Implicit ODP блокироваться суперпользователя, чтобы посмотреть список доступных RDMA-устройств, их
от выгрузки будет вся память виртуальных машин. параметры и возможности.
В случае с адаптерами Mellanox Implicit ODP поддерживается начиная с
ConnectX-4. ConnectX-3 и более старые адаптеры не поддерживают Implicit ODP.
Чтобы посмотреть список своих RDMA-устройств и их возможностей, запустите
`ibv_devinfo -v` от имени суперпользователя.
- name: rdma_port_num - name: rdma_port_num
type: int type: int
default: 1 default: 1

View File

@@ -1345,21 +1345,30 @@ class Mon
const tm = prev_stats ? BigInt(timestamp - prev_stats.timestamp) : 0; const tm = prev_stats ? BigInt(timestamp - prev_stats.timestamp) : 0;
for (const op in op_stats) for (const op in op_stats)
{ {
op_stats[op].bps = prev_stats ? (op_stats[op].bytes - prev_stats.op_stats[op].bytes) * 1000n / tm : 0; if (prev_stats && prev_stats.op_stats && prev_stats.op_stats[op])
op_stats[op].iops = prev_stats ? (op_stats[op].count - prev_stats.op_stats[op].count) * 1000n / tm : 0; {
op_stats[op].lat = prev_stats ? (op_stats[op].usec - prev_stats.op_stats[op].usec) op_stats[op].bps = (op_stats[op].bytes - prev_stats.op_stats[op].bytes) * 1000n / tm;
/ ((op_stats[op].count - prev_stats.op_stats[op].count) || 1n) : 0; op_stats[op].iops = (op_stats[op].count - prev_stats.op_stats[op].count) * 1000n / tm;
op_stats[op].lat = (op_stats[op].usec - prev_stats.op_stats[op].usec)
/ ((op_stats[op].count - prev_stats.op_stats[op].count) || 1n);
}
} }
for (const op in subop_stats) for (const op in subop_stats)
{ {
subop_stats[op].iops = prev_stats ? (subop_stats[op].count - prev_stats.subop_stats[op].count) * 1000n / tm : 0; if (prev_stats && prev_stats.subop_stats && prev_stats.subop_stats[op])
subop_stats[op].lat = prev_stats ? (subop_stats[op].usec - prev_stats.subop_stats[op].usec) {
/ ((subop_stats[op].count - prev_stats.subop_stats[op].count) || 1n) : 0; subop_stats[op].iops = (subop_stats[op].count - prev_stats.subop_stats[op].count) * 1000n / tm;
subop_stats[op].lat = (subop_stats[op].usec - prev_stats.subop_stats[op].usec)
/ ((subop_stats[op].count - prev_stats.subop_stats[op].count) || 1n);
}
} }
for (const op in recovery_stats) for (const op in recovery_stats)
{ {
recovery_stats[op].bps = prev_stats ? (recovery_stats[op].bytes - prev_stats.recovery_stats[op].bytes) * 1000n / tm : 0; if (prev_stats && prev_stats.recovery_stats && prev_stats.recovery_stats[op])
recovery_stats[op].iops = prev_stats ? (recovery_stats[op].count - prev_stats.recovery_stats[op].count) * 1000n / tm : 0; {
recovery_stats[op].bps = (recovery_stats[op].bytes - prev_stats.recovery_stats[op].bytes) * 1000n / tm;
recovery_stats[op].iops = (recovery_stats[op].count - prev_stats.recovery_stats[op].count) * 1000n / tm;
}
} }
return { op_stats, subop_stats, recovery_stats }; return { op_stats, subop_stats, recovery_stats };
} }

View File

@@ -50,7 +50,7 @@ from cinder.volume import configuration
from cinder.volume import driver from cinder.volume import driver
from cinder.volume import volume_utils from cinder.volume import volume_utils
VERSION = '0.6.12' VERSION = '0.6.15'
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)

View File

@@ -25,4 +25,4 @@ rm fio
mv fio-copy fio mv fio-copy fio
FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'` FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
tar --transform 's#^#vitastor-0.6.12/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.12$(rpm --eval '%dist').tar.gz * tar --transform 's#^#vitastor-0.6.15/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.15$(rpm --eval '%dist').tar.gz *

View File

@@ -34,7 +34,7 @@ ADD . /root/vitastor
RUN set -e; \ RUN set -e; \
cd /root/vitastor/rpm; \ cd /root/vitastor/rpm; \
sh build-tarball.sh; \ sh build-tarball.sh; \
cp /root/vitastor-0.6.12.el7.tar.gz ~/rpmbuild/SOURCES; \ cp /root/vitastor-0.6.15.el7.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \ cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \ cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \ rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor Name: vitastor
Version: 0.6.12 Version: 0.6.15
Release: 1%{?dist} Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1 License: Vitastor Network Public License 1.1
URL: https://vitastor.io/ URL: https://vitastor.io/
Source0: vitastor-0.6.12.el7.tar.gz Source0: vitastor-0.6.15.el7.tar.gz
BuildRequires: liburing-devel >= 0.6 BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel BuildRequires: gperftools-devel

View File

@@ -33,7 +33,7 @@ ADD . /root/vitastor
RUN set -e; \ RUN set -e; \
cd /root/vitastor/rpm; \ cd /root/vitastor/rpm; \
sh build-tarball.sh; \ sh build-tarball.sh; \
cp /root/vitastor-0.6.12.el8.tar.gz ~/rpmbuild/SOURCES; \ cp /root/vitastor-0.6.15.el8.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \ cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \ cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \ rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor Name: vitastor
Version: 0.6.12 Version: 0.6.15
Release: 1%{?dist} Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1 License: Vitastor Network Public License 1.1
URL: https://vitastor.io/ URL: https://vitastor.io/
Source0: vitastor-0.6.12.el8.tar.gz Source0: vitastor-0.6.15.el8.tar.gz
BuildRequires: liburing-devel >= 0.6 BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel BuildRequires: gperftools-devel

View File

@@ -15,7 +15,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}") set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}")
endif() endif()
add_definitions(-DVERSION="0.6.12") add_definitions(-DVERSION="0.6.15")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -I ${CMAKE_SOURCE_DIR}/src) add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -I ${CMAKE_SOURCE_DIR}/src)
if (${WITH_ASAN}) if (${WITH_ASAN})
add_definitions(-fsanitize=address -fno-omit-frame-pointer) add_definitions(-fsanitize=address -fno-omit-frame-pointer)
@@ -155,7 +155,7 @@ target_link_libraries(vitastor-nbd
# vitastor-cli # vitastor-cli
add_executable(vitastor-cli add_executable(vitastor-cli
cli.cpp cli_alloc_osd.cpp cli_simple_offsets.cpp cli_df.cpp cli.cpp cli_alloc_osd.cpp cli_simple_offsets.cpp cli_df.cpp
cli_ls.cpp cli_create.cpp cli_modify.cpp cli_flatten.cpp cli_merge.cpp cli_rm.cpp cli_snap_rm.cpp cli_ls.cpp cli_create.cpp cli_modify.cpp cli_flatten.cpp cli_merge.cpp cli_rm_data.cpp cli_rm.cpp
) )
target_link_libraries(vitastor-cli target_link_libraries(vitastor-cli
vitastor_client vitastor_client

View File

@@ -1,3 +1,5 @@
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <net/if.h> #include <net/if.h>
#include <sys/types.h> #include <sys/types.h>
@@ -9,7 +11,7 @@
#include "addr_util.h" #include "addr_util.h"
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr) bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr_storage *addr)
{ {
if (parse_port) if (parse_port)
{ {
@@ -25,7 +27,7 @@ bool string_to_addr(std::string str, bool parse_port, int default_port, struct s
} }
if (inet_pton(AF_INET, str.c_str(), &((struct sockaddr_in*)addr)->sin_addr) == 1) if (inet_pton(AF_INET, str.c_str(), &((struct sockaddr_in*)addr)->sin_addr) == 1)
{ {
addr->sa_family = AF_INET; addr->ss_family = AF_INET;
((struct sockaddr_in*)addr)->sin_port = htons(default_port); ((struct sockaddr_in*)addr)->sin_port = htons(default_port);
return true; return true;
} }
@@ -33,30 +35,30 @@ bool string_to_addr(std::string str, bool parse_port, int default_port, struct s
str = str.substr(1, str.length()-2); str = str.substr(1, str.length()-2);
if (inet_pton(AF_INET6, str.c_str(), &((struct sockaddr_in6*)addr)->sin6_addr) == 1) if (inet_pton(AF_INET6, str.c_str(), &((struct sockaddr_in6*)addr)->sin6_addr) == 1)
{ {
addr->sa_family = AF_INET6; addr->ss_family = AF_INET6;
((struct sockaddr_in6*)addr)->sin6_port = htons(default_port); ((struct sockaddr_in6*)addr)->sin6_port = htons(default_port);
return true; return true;
} }
return false; return false;
} }
std::string addr_to_string(const sockaddr &addr) std::string addr_to_string(const sockaddr_storage &addr)
{ {
char peer_str[256]; char peer_str[256];
bool ok = false; bool ok = false;
int port; int port;
if (addr.sa_family == AF_INET) if (addr.ss_family == AF_INET)
{ {
ok = !!inet_ntop(AF_INET, &((sockaddr_in*)&addr)->sin_addr, peer_str, 256); ok = !!inet_ntop(AF_INET, &((sockaddr_in*)&addr)->sin_addr, peer_str, 256);
port = ntohs(((sockaddr_in*)&addr)->sin_port); port = ntohs(((sockaddr_in*)&addr)->sin_port);
} }
else if (addr.sa_family == AF_INET6) else if (addr.ss_family == AF_INET6)
{ {
ok = !!inet_ntop(AF_INET6, &((sockaddr_in6*)&addr)->sin6_addr, peer_str, 256); ok = !!inet_ntop(AF_INET6, &((sockaddr_in6*)&addr)->sin6_addr, peer_str, 256);
port = ntohs(((sockaddr_in6*)&addr)->sin6_port); port = ntohs(((sockaddr_in6*)&addr)->sin6_port);
} }
else else
throw std::runtime_error("Unknown address family "+std::to_string(addr.sa_family)); throw std::runtime_error("Unknown address family "+std::to_string(addr.ss_family));
if (!ok) if (!ok)
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
return std::string(peer_str)+":"+std::to_string(port); return std::string(peer_str)+":"+std::to_string(port);
@@ -186,3 +188,51 @@ std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg, bool
freeifaddrs(list); freeifaddrs(list);
return addresses; return addresses;
} }
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port)
{
sockaddr_storage addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listening_port)
{
if (bind_port == 0)
{
socklen_t len = sizeof(addr);
if (getsockname(listen_fd, (sockaddr *)&addr, &len) == -1)
{
close(listen_fd);
throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
}
*listening_port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else
{
*listening_port = bind_port;
}
}
if (listen(listen_fd, listen_backlog ? listen_backlog : 128) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
return listen_fd;
}

View File

@@ -4,6 +4,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr); bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr_storage *addr);
std::string addr_to_string(const sockaddr &addr); std::string addr_to_string(const sockaddr_storage &addr);
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg = std::vector<std::string>(), bool include_v6 = false); std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg = std::vector<std::string>(), bool include_v6 = false);
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);

View File

@@ -21,7 +21,7 @@
// Memory alignment for direct I/O (usually 512 bytes) // Memory alignment for direct I/O (usually 512 bytes)
// All other alignments must be a multiple of this one // All other alignments must be a multiple of this one
#ifndef MEM_ALIGNMENT #ifndef MEM_ALIGNMENT
#define MEM_ALIGNMENT 512 #define MEM_ALIGNMENT 4096
#endif #endif
// Default block size is 128 KB, current allowed range is 4K - 128M // Default block size is 128 KB, current allowed range is 4K - 128M

View File

@@ -415,8 +415,11 @@ stop_flusher:
flusher->active_flushers++; flusher->active_flushers++;
resume_1: resume_1:
// Find it in clean_db // Find it in clean_db
clean_it = bs->clean_db.find(cur.oid); {
old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX); auto & clean_db = bs->clean_db_shard(cur.oid);
auto clean_it = clean_db.find(cur.oid);
old_clean_loc = (clean_it != clean_db.end() ? clean_it->second.location : UINT64_MAX);
}
// Scan dirty versions of the object // Scan dirty versions of the object
if (!scan_dirty(1)) if (!scan_dirty(1))
{ {
@@ -870,10 +873,11 @@ void journal_flusher_co::update_clean_db()
#endif #endif
bs->data_alloc->set(old_clean_loc >> bs->block_order, false); bs->data_alloc->set(old_clean_loc >> bs->block_order, false);
} }
auto & clean_db = bs->clean_db_shard(cur.oid);
if (has_delete) if (has_delete)
{ {
auto clean_it = bs->clean_db.find(cur.oid); auto clean_it = clean_db.find(cur.oid);
bs->clean_db.erase(clean_it); clean_db.erase(clean_it);
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
printf("Free block %lu from %lx:%lx v%lu (delete)\n", printf("Free block %lu from %lx:%lx v%lu (delete)\n",
clean_loc >> bs->block_order, clean_loc >> bs->block_order,
@@ -884,7 +888,7 @@ void journal_flusher_co::update_clean_db()
} }
else else
{ {
bs->clean_db[cur.oid] = { clean_db[cur.oid] = {
.version = cur.version, .version = cur.version,
.location = clean_loc, .location = clean_loc,
}; };

View File

@@ -49,7 +49,6 @@ class journal_flusher_co
std::function<void(ring_data_t*)> simple_callback_r, simple_callback_w; std::function<void(ring_data_t*)> simple_callback_r, simple_callback_w;
bool skip_copy, has_delete, has_writes; bool skip_copy, has_delete, has_writes;
blockstore_clean_db_t::iterator clean_it;
std::vector<copy_buffer_t> v; std::vector<copy_buffer_t> v;
std::vector<copy_buffer_t>::iterator it; std::vector<copy_buffer_t>::iterator it;
int copy_count; int copy_count;

View File

@@ -118,7 +118,7 @@ void blockstore_impl_t::loop()
// has_writes == 0 - no writes before the current queue item // has_writes == 0 - no writes before the current queue item
// has_writes == 1 - some writes in progress // has_writes == 1 - some writes in progress
// has_writes == 2 - tried to submit some writes, but failed // has_writes == 2 - tried to submit some writes, but failed
int has_writes = 0, op_idx = 0, new_idx = 0; int has_writes = 0, op_idx = 0, new_idx = 0, done_lists = 0;
for (; op_idx < submit_queue.size(); op_idx++, new_idx++) for (; op_idx < submit_queue.size(); op_idx++, new_idx++)
{ {
auto op = submit_queue[op_idx]; auto op = submit_queue[op_idx];
@@ -198,9 +198,14 @@ void blockstore_impl_t::loop()
} }
else if (op->opcode == BS_OP_LIST) else if (op->opcode == BS_OP_LIST)
{ {
// LIST doesn't need to be blocked by previous modifications // LIST doesn't have to be blocked by previous modifications
process_list(op); // But don't do a lot of LISTs at once, because they're blocking and potentially slow
wr_st = 2; if (single_tick_list_limit <= 0 || done_lists < single_tick_list_limit)
{
process_list(op);
done_lists++;
wr_st = 2;
}
} }
if (wr_st == 2) if (wr_st == 2)
{ {
@@ -423,22 +428,104 @@ static bool replace_stable(object_id oid, uint64_t version, int search_start, in
return false; return false;
} }
blockstore_clean_db_t& blockstore_impl_t::clean_db_shard(object_id oid)
{
uint64_t pg_num = 0;
uint64_t pool_id = (oid.inode >> (64-POOL_ID_BITS));
auto sh_it = clean_db_settings.find(pool_id);
if (sh_it != clean_db_settings.end())
{
// like map_to_pg()
pg_num = (oid.stripe / sh_it->second.pg_stripe_size) % sh_it->second.pg_count + 1;
}
return clean_db_shards[(pool_id << (64-POOL_ID_BITS)) | pg_num];
}
void blockstore_impl_t::reshard_clean_db(pool_id_t pool, uint32_t pg_count, uint32_t pg_stripe_size)
{
uint64_t pool_id = (uint64_t)pool;
std::map<pool_pg_id_t, blockstore_clean_db_t> new_shards;
auto sh_it = clean_db_shards.lower_bound((pool_id << (64-POOL_ID_BITS)));
while (sh_it != clean_db_shards.end() &&
(sh_it->first >> (64-POOL_ID_BITS)) == pool_id)
{
for (auto & pair: sh_it->second)
{
// like map_to_pg()
uint64_t pg_num = (pair.first.stripe / pg_stripe_size) % pg_count + 1;
uint64_t shard_id = (pool_id << (64-POOL_ID_BITS)) | pg_num;
new_shards[shard_id][pair.first] = pair.second;
}
clean_db_shards.erase(sh_it++);
}
for (sh_it = new_shards.begin(); sh_it != new_shards.end(); sh_it++)
{
auto & to = clean_db_shards[sh_it->first];
to.swap(sh_it->second);
}
clean_db_settings[pool_id] = (pool_shard_settings_t){
.pg_count = pg_count,
.pg_stripe_size = pg_stripe_size,
};
}
void blockstore_impl_t::process_list(blockstore_op_t *op) void blockstore_impl_t::process_list(blockstore_op_t *op)
{ {
uint32_t list_pg = op->offset; uint32_t list_pg = op->offset+1;
uint32_t pg_count = op->len; uint32_t pg_count = op->len;
uint64_t pg_stripe_size = op->oid.stripe; uint64_t pg_stripe_size = op->oid.stripe;
uint64_t min_inode = op->oid.inode; uint64_t min_inode = op->oid.inode;
uint64_t max_inode = op->version; uint64_t max_inode = op->version;
// Check PG // Check PG
if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg >= pg_count)) if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg > pg_count))
{ {
op->retval = -EINVAL; op->retval = -EINVAL;
FINISH_OP(op); FINISH_OP(op);
return; return;
} }
// Copy clean_db entries (sorted) // Check if the DB needs resharding
int stable_count = 0, stable_alloc = clean_db.size() / (pg_count ? pg_count : 1); // (we don't know about PGs from the beginning, we only create "shards" here)
uint64_t first_shard = 0, last_shard = UINT64_MAX;
if (min_inode != 0 &&
// Check if min_inode == max_inode == pool_id<<N, i.e. this is a pool listing
(min_inode >> (64-POOL_ID_BITS)) == (max_inode >> (64-POOL_ID_BITS)))
{
pool_id_t pool_id = (min_inode >> (64-POOL_ID_BITS));
if (pg_count > 1)
{
// Per-pg listing
auto sh_it = clean_db_settings.find(pool_id);
if (sh_it == clean_db_settings.end() ||
sh_it->second.pg_count != pg_count ||
sh_it->second.pg_stripe_size != pg_stripe_size)
{
reshard_clean_db(pool_id, pg_count, pg_stripe_size);
}
first_shard = last_shard = ((uint64_t)pool_id << (64-POOL_ID_BITS)) | list_pg;
}
else
{
// Per-pool listing
first_shard = ((uint64_t)pool_id << (64-POOL_ID_BITS));
last_shard = ((uint64_t)(pool_id+1) << (64-POOL_ID_BITS)) - 1;
}
}
// Copy clean_db entries
int stable_count = 0, stable_alloc = 0;
if (min_inode != max_inode)
{
for (auto shard_it = clean_db_shards.lower_bound(first_shard);
shard_it != clean_db_shards.end() && shard_it->first <= last_shard;
shard_it++)
{
auto & clean_db = shard_it->second;
stable_alloc += clean_db.size();
}
}
else
{
stable_alloc = 32768;
}
obj_ver_id *stable = (obj_ver_id*)malloc(sizeof(obj_ver_id) * stable_alloc); obj_ver_id *stable = (obj_ver_id*)malloc(sizeof(obj_ver_id) * stable_alloc);
if (!stable) if (!stable)
{ {
@@ -446,7 +533,11 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
FINISH_OP(op); FINISH_OP(op);
return; return;
} }
for (auto shard_it = clean_db_shards.lower_bound(first_shard);
shard_it != clean_db_shards.end() && shard_it->first <= last_shard;
shard_it++)
{ {
auto & clean_db = shard_it->second;
auto clean_it = clean_db.begin(), clean_end = clean_db.end(); auto clean_it = clean_db.begin(), clean_end = clean_db.end();
if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode) if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode)
{ {
@@ -461,26 +552,28 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
} }
for (; clean_it != clean_end; clean_it++) for (; clean_it != clean_end; clean_it++)
{ {
if (!pg_count || ((clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg() if (stable_count >= stable_alloc)
{ {
if (stable_count >= stable_alloc) stable_alloc *= 2;
stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
if (!stable)
{ {
stable_alloc += 32768; op->retval = -ENOMEM;
stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc); FINISH_OP(op);
if (!stable) return;
{
op->retval = -ENOMEM;
FINISH_OP(op);
return;
}
} }
stable[stable_count++] = {
.oid = clean_it->first,
.version = clean_it->second.version,
};
} }
stable[stable_count++] = {
.oid = clean_it->first,
.version = clean_it->second.version,
};
} }
} }
if (first_shard != last_shard)
{
// If that's not a per-PG listing, sort clean entries
std::sort(stable, stable+stable_count);
}
int clean_stable_count = stable_count; int clean_stable_count = stable_count;
// Copy dirty_db entries (sorted, too) // Copy dirty_db entries (sorted, too)
int unstable_count = 0, unstable_alloc = 0; int unstable_count = 0, unstable_alloc = 0;
@@ -506,7 +599,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
} }
for (; dirty_it != dirty_end; dirty_it++) for (; dirty_it != dirty_end; dirty_it++)
{ {
if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg() if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count + 1) == list_pg) // like map_to_pg()
{ {
if (IS_DELETE(dirty_it->second.state)) if (IS_DELETE(dirty_it->second.state))
{ {

View File

@@ -204,6 +204,17 @@ typedef std::map<obj_ver_id, dirty_entry> blockstore_dirty_db_t;
#include "blockstore_flush.h" #include "blockstore_flush.h"
typedef uint32_t pool_id_t;
typedef uint64_t pool_pg_id_t;
#define POOL_ID_BITS 16
struct pool_shard_settings_t
{
uint32_t pg_count;
uint32_t pg_stripe_size;
};
class blockstore_impl_t class blockstore_impl_t
{ {
/******* OPTIONS *******/ /******* OPTIONS *******/
@@ -241,11 +252,14 @@ class blockstore_impl_t
int throttle_target_parallelism = 1; int throttle_target_parallelism = 1;
// Minimum difference in microseconds between target and real execution times to throttle the response // Minimum difference in microseconds between target and real execution times to throttle the response
int throttle_threshold_us = 50; int throttle_threshold_us = 50;
// Maximum number of LIST operations to be processed between
int single_tick_list_limit = 1;
/******* END OF OPTIONS *******/ /******* END OF OPTIONS *******/
struct ring_consumer_t ring_consumer; struct ring_consumer_t ring_consumer;
blockstore_clean_db_t clean_db; std::map<pool_id_t, pool_shard_settings_t> clean_db_settings;
std::map<pool_pg_id_t, blockstore_clean_db_t> clean_db_shards;
uint8_t *clean_bitmap = NULL; uint8_t *clean_bitmap = NULL;
blockstore_dirty_db_t dirty_db; blockstore_dirty_db_t dirty_db;
std::vector<blockstore_op_t*> submit_queue; std::vector<blockstore_op_t*> submit_queue;
@@ -294,6 +308,9 @@ class blockstore_impl_t
void open_journal(); void open_journal();
uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset); uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset);
blockstore_clean_db_t& clean_db_shard(object_id oid);
void reshard_clean_db(pool_id_t pool_id, uint32_t pg_count, uint32_t pg_stripe_size);
// Journaling // Journaling
void prepare_journal_sector_write(int sector, blockstore_op_t *op); void prepare_journal_sector_write(int sector, blockstore_op_t *op);
void handle_journal_write(ring_data_t *data, uint64_t flush_id); void handle_journal_write(ring_data_t *data, uint64_t flush_id);

View File

@@ -222,10 +222,11 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
} }
if (entry->oid.inode > 0) if (entry->oid.inode > 0)
{ {
auto clean_it = bs->clean_db.find(entry->oid); auto & clean_db = bs->clean_db_shard(entry->oid);
if (clean_it == bs->clean_db.end() || clean_it->second.version < entry->version) auto clean_it = clean_db.find(entry->oid);
if (clean_it == clean_db.end() || clean_it->second.version < entry->version)
{ {
if (clean_it != bs->clean_db.end()) if (clean_it != clean_db.end())
{ {
// free the previous block // free the previous block
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
@@ -245,7 +246,7 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
printf("Allocate block (clean entry) %lu: %lx:%lx v%lu\n", done_cnt+i, entry->oid.inode, entry->oid.stripe, entry->version); printf("Allocate block (clean entry) %lu: %lx:%lx v%lu\n", done_cnt+i, entry->oid.inode, entry->oid.stripe, entry->version);
#endif #endif
bs->data_alloc->set(done_cnt+i, true); bs->data_alloc->set(done_cnt+i, true);
bs->clean_db[entry->oid] = (struct clean_entry){ clean_db[entry->oid] = (struct clean_entry){
.version = entry->version, .version = entry->version,
.location = (done_cnt+i) << block_order, .location = (done_cnt+i) << block_order,
}; };
@@ -656,8 +657,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
init_write_sector = proc_pos; init_write_sector = proc_pos;
return 0; return 0;
} }
auto clean_it = bs->clean_db.find(je->small_write.oid); auto & clean_db = bs->clean_db_shard(je->small_write.oid);
if (clean_it == bs->clean_db.end() || auto clean_it = clean_db.find(je->small_write.oid);
if (clean_it == clean_db.end() ||
clean_it->second.version < je->small_write.version) clean_it->second.version < je->small_write.version)
{ {
obj_ver_id ov = { obj_ver_id ov = {
@@ -735,8 +737,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
erase_dirty_object(dirty_it); erase_dirty_object(dirty_it);
} }
} }
auto clean_it = bs->clean_db.find(je->big_write.oid); auto & clean_db = bs->clean_db_shard(je->big_write.oid);
if (clean_it == bs->clean_db.end() || auto clean_it = clean_db.find(je->big_write.oid);
if (clean_it == clean_db.end() ||
clean_it->second.version < je->big_write.version) clean_it->second.version < je->big_write.version)
{ {
// oid, version, block // oid, version, block
@@ -841,8 +844,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
dirty_it--; dirty_it--;
dirty_exists = dirty_it->first.oid == je->del.oid; dirty_exists = dirty_it->first.oid == je->del.oid;
} }
auto clean_it = bs->clean_db.find(je->del.oid); auto & clean_db = bs->clean_db_shard(je->del.oid);
bool clean_exists = (clean_it != bs->clean_db.end() && auto clean_it = clean_db.find(je->del.oid);
bool clean_exists = (clean_it != clean_db.end() &&
clean_it->second.version < je->del.version); clean_it->second.version < je->del.version);
if (!clean_exists && dirty_exists) if (!clean_exists && dirty_exists)
{ {
@@ -901,8 +905,9 @@ void blockstore_init_journal::erase_dirty_object(blockstore_dirty_db_t::iterator
break; break;
} }
} }
auto clean_it = bs->clean_db.find(oid); auto & clean_db = bs->clean_db_shard(oid);
uint64_t clean_loc = clean_it != bs->clean_db.end() auto clean_it = clean_db.find(oid);
uint64_t clean_loc = clean_it != clean_db.end()
? clean_it->second.location : UINT64_MAX; ? clean_it->second.location : UINT64_MAX;
if (exists && clean_loc == UINT64_MAX) if (exists && clean_loc == UINT64_MAX)
{ {

View File

@@ -111,6 +111,7 @@ uint8_t* blockstore_impl_t::get_clean_entry_bitmap(uint64_t block_loc, int offse
int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
{ {
auto & clean_db = clean_db_shard(read_op->oid);
auto clean_it = clean_db.find(read_op->oid); auto clean_it = clean_db.find(read_op->oid);
auto dirty_it = dirty_db.upper_bound((obj_ver_id){ auto dirty_it = dirty_db.upper_bound((obj_ver_id){
.oid = read_op->oid, .oid = read_op->oid,
@@ -297,6 +298,7 @@ int blockstore_impl_t::read_bitmap(object_id oid, uint64_t target_version, void
dirty_it--; dirty_it--;
} }
} }
auto & clean_db = clean_db_shard(oid);
auto clean_it = clean_db.find(oid); auto clean_it = clean_db.find(oid);
if (clean_it != clean_db.end()) if (clean_it != clean_db.end())
{ {

View File

@@ -54,6 +54,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
auto dirty_it = dirty_db.find(*v); auto dirty_it = dirty_db.find(*v);
if (dirty_it == dirty_db.end()) if (dirty_it == dirty_db.end())
{ {
auto & clean_db = clean_db_shard(v->oid);
auto clean_it = clean_db.find(v->oid); auto clean_it = clean_db.find(v->oid);
if (clean_it == clean_db.end() || clean_it->second.version < v->version) if (clean_it == clean_db.end() || clean_it->second.version < v->version)
{ {
@@ -188,6 +189,7 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
} }
if (exists == -1) if (exists == -1)
{ {
auto & clean_db = clean_db_shard(v.oid);
auto clean_it = clean_db.find(v.oid); auto clean_it = clean_db.find(v.oid);
exists = clean_it != clean_db.end() ? 1 : 0; exists = clean_it != clean_db.end() ? 1 : 0;
} }
@@ -215,6 +217,7 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
break; break;
} }
} }
auto & clean_db = clean_db_shard(v.oid);
auto clean_it = clean_db.find(v.oid); auto clean_it = clean_db.find(v.oid);
uint64_t clean_loc = clean_it != clean_db.end() uint64_t clean_loc = clean_it != clean_db.end()
? clean_it->second.location : UINT64_MAX; ? clean_it->second.location : UINT64_MAX;

View File

@@ -41,6 +41,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
} }
if (!found) if (!found)
{ {
auto & clean_db = clean_db_shard(op->oid);
auto clean_it = clean_db.find(op->oid); auto clean_it = clean_db.find(op->oid);
if (clean_it != clean_db.end()) if (clean_it != clean_db.end())
{ {

View File

@@ -154,7 +154,7 @@ resume_1:
if (pool_it != parent->cli->st_cli.pool_config.end()) if (pool_it != parent->cli->st_cli.pool_config.end())
{ {
auto & pool_cfg = pool_it->second; auto & pool_cfg = pool_it->second;
used_size = used_size / pool_pg_real_size[pool_id] used_size = used_size / (pool_pg_real_size[pool_id] ? pool_pg_real_size[pool_id] : 1)
* (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
} }
auto stat_it = stats.find(inode_num); auto stat_it = stats.find(inode_num);

View File

@@ -1,211 +1,566 @@
// Copyright (c) Vitaliy Filippov, 2019+ // Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details) // License: VNPL-1.1 (see README.md for details)
#include <fcntl.h>
#include "cli.h" #include "cli.h"
#include "cluster_client.h" #include "cluster_client.h"
#include "base64.h"
#define RM_LISTING 1 // Remove layer(s): similar to merge, but alters metadata and processes multiple merge targets
#define RM_REMOVING 2 //
#define RM_END 3 // Exactly one child of the requested layers may be merged using the "inverted" workflow,
// where we merge it "down" into one of the "to-be-removed" layers and then rename the
struct rm_pg_t // "to-be-removed" layer to the child. It may be done either if all writers are stopped
// before trying to delete layers (which is signaled by --writers-stopped) or if that child
// is a read-only layer (snapshot) itself.
//
// This "inverted" workflow trades copying data of one of the deleted layers for copying
// data of one child of the chain which is also a child of the "traded" layer. So we
// choose the (parent,child) pair which has the largest difference between "parent" and
// "child" inode sizes.
//
// All other children of the chain are processed by iterating though them, merging removed
// parents into them and rebasing them to the last layer which isn't a member of the removed
// chain.
//
// Example:
//
// <parent> - <from> - <layer 2> - <to> - <child 1>
// \ \ \- <child 2>
// \ \- <child 3>
// \-<child 4>
//
// 1) Find optimal pair for the "reverse" scenario
// Imagine that it's (<layer 2>, <child 1>) in this example
// 2) Process all children except <child 1>:
// - Merge <from>..<to> to <child 2>
// - Set <child 2> parent to <parent>
// - Repeat for others
// 3) Process <child 1>:
// - Merge <from>..<child 1> to <layer 2>
// - Set <layer 2> parent to <parent>
// - Rename <layer 2> to <child 1>
// 4) Delete other layers of the chain (<from>, <to>)
struct snap_remover_t
{ {
pg_num_t pg_num; cli_tool_t *parent;
osd_num_t rm_osd_num;
std::set<object_id> objects; // remove from..to
std::set<object_id>::iterator obj_pos; std::string from_name, to_name;
uint64_t obj_count = 0, obj_done = 0; // writers are stopped, we can safely change writable layers
bool writers_stopped = false;
// use CAS writes (0 = never, 1 = auto, 2 = always)
int use_cas = 1;
// interval between fsyncs
int fsync_interval = 128;
std::map<inode_t,int> sources;
std::map<inode_t,uint64_t> inode_used;
std::vector<inode_t> merge_children;
std::vector<inode_t> chain_list;
std::map<inode_t,int> inverse_candidates;
inode_t inverse_parent = 0, inverse_child = 0;
inode_t new_parent = 0;
int state = 0; int state = 0;
int in_flight = 0; int current_child = 0;
}; std::function<bool(void)> cb;
struct rm_inode_t bool is_done()
{
uint64_t inode = 0;
pool_id_t pool_id = 0;
uint64_t min_offset = 0;
cli_tool_t *parent = NULL;
inode_list_t *lister = NULL;
std::vector<rm_pg_t*> lists;
uint64_t total_count = 0, total_done = 0, total_prev_pct = 0;
uint64_t pgs_to_list = 0;
bool lists_done = false;
int state = 0;
void start_delete()
{ {
lister = parent->cli->list_inode_start(inode, [this](inode_list_t *lst, return state == 9;
std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status) }
void loop()
{
if (state == 1)
goto resume_1;
else if (state == 2)
goto resume_2;
else if (state == 3)
goto resume_3;
else if (state == 4)
goto resume_4;
else if (state == 5)
goto resume_5;
else if (state == 6)
goto resume_6;
else if (state == 7)
goto resume_7;
else if (state == 8)
goto resume_8;
else if (state == 9)
goto resume_9;
// Get children to merge
get_merge_children();
// Try to select an inode for the "inverse" optimized scenario
// Read statistics from etcd to do it
read_stats();
state = 1;
resume_1:
if (parent->waiting > 0)
return;
choose_inverse_candidate();
// Merge children one by one, except our "inverse" child
for (current_child = 0; current_child < merge_children.size(); current_child++)
{ {
rm_pg_t *rm = new rm_pg_t((rm_pg_t){ if (merge_children[current_child] == inverse_child)
.pg_num = pg_num, continue;
.rm_osd_num = primary_osd, start_merge_child(merge_children[current_child], merge_children[current_child]);
.objects = objects, resume_2:
.obj_count = objects.size(), while (!cb())
.obj_done = 0,
});
if (min_offset == 0)
{ {
total_count += objects.size(); state = 2;
return;
} }
else cb = NULL;
{ parent->change_parent(merge_children[current_child], new_parent);
for (object_id oid: objects) state = 3;
{ resume_3:
if (oid.stripe >= min_offset) if (parent->waiting > 0)
{ return;
total_count++; }
} // Merge our "inverse" child into our "inverse" parent
} if (inverse_child != 0)
}
rm->obj_pos = rm->objects.begin();
lists.push_back(rm);
if (parent->list_first)
{
parent->cli->list_inode_next(lister, 1);
}
if (status & INODE_LIST_DONE)
{
lists_done = true;
}
pgs_to_list--;
continue_delete();
});
if (!lister)
{ {
fprintf(stderr, "Failed to list inode %lu from pool %u objects\n", INODE_NO_POOL(inode), INODE_POOL(inode)); start_merge_child(inverse_child, inverse_parent);
resume_4:
while (!cb())
{
state = 4;
return;
}
cb = NULL;
// Delete "inverse" child data
start_delete_source(inverse_child);
resume_5:
while (!cb())
{
state = 5;
return;
}
cb = NULL;
// Delete "inverse" child metadata, rename parent over it,
// and also change parent links of the previous "inverse" child
rename_inverse_parent();
state = 6;
resume_6:
if (parent->waiting > 0)
return;
}
// Delete parents, except the "inverse" one
for (current_child = 0; current_child < chain_list.size(); current_child++)
{
if (chain_list[current_child] == inverse_parent)
continue;
start_delete_source(chain_list[current_child]);
resume_7:
while (!cb())
{
state = 7;
return;
}
cb = NULL;
delete_inode_config(chain_list[current_child]);
state = 8;
resume_8:
if (parent->waiting > 0)
return;
}
state = 9;
resume_9:
// Done
return;
}
void get_merge_children()
{
// Get all children of from..to
inode_config_t *from_cfg = parent->get_inode_cfg(from_name);
inode_config_t *to_cfg = parent->get_inode_cfg(to_name);
// Check that to_cfg is actually a child of from_cfg
// FIXME de-copypaste the following piece of code with snap_merger_t
inode_config_t *cur = to_cfg;
chain_list.push_back(cur->num);
while (cur->num != from_cfg->num && cur->parent_id != 0)
{
auto it = parent->cli->st_cli.inode_config.find(cur->parent_id);
if (it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Parent inode of layer %s (id %ld) not found\n", cur->name.c_str(), cur->parent_id);
exit(1);
}
cur = &it->second;
chain_list.push_back(cur->num);
}
if (cur->num != from_cfg->num)
{
fprintf(stderr, "Layer %s is not a child of %s\n", to_name.c_str(), from_name.c_str());
exit(1); exit(1);
} }
pgs_to_list = parent->cli->list_pg_count(lister); new_parent = from_cfg->parent_id;
parent->cli->list_inode_next(lister, parent->parallel_osds); // Calculate ranks
} int i = chain_list.size()-1;
for (inode_t item: chain_list)
void send_ops(rm_pg_t *cur_list)
{
if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) ==
parent->cli->msgr.osd_peer_fds.end())
{ {
// Initiate connection sources[item] = i--;
parent->cli->msgr.connect_peer(cur_list->rm_osd_num, parent->cli->st_cli.peer_states[cur_list->rm_osd_num]);
return;
} }
while (cur_list->in_flight < parent->iodepth && cur_list->obj_pos != cur_list->objects.end()) for (auto & ic: parent->cli->st_cli.inode_config)
{ {
if (cur_list->obj_pos->stripe >= min_offset) if (!ic.second.parent_id)
{ {
osd_op_t *op = new osd_op_t(); continue;
op->op_type = OSD_OP_OUT;
op->peer_fd = parent->cli->msgr.osd_peer_fds[cur_list->rm_osd_num];
op->req = (osd_any_op_t){
.rw = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = parent->cli->next_op_id(),
.opcode = OSD_OP_DELETE,
},
.inode = cur_list->obj_pos->inode,
.offset = cur_list->obj_pos->stripe,
.len = 0,
},
};
op->callback = [this, cur_list](osd_op_t *op)
{
cur_list->in_flight--;
if (op->reply.hdr.retval < 0)
{
fprintf(stderr, "Failed to remove object %lx:%lx from PG %u (OSD %lu) (retval=%ld)\n",
op->req.rw.inode, op->req.rw.offset,
cur_list->pg_num, cur_list->rm_osd_num, op->reply.hdr.retval);
}
delete op;
cur_list->obj_done++;
total_done++;
continue_delete();
};
cur_list->in_flight++;
parent->cli->msgr.outbox_push(op);
} }
cur_list->obj_pos++; auto it = sources.find(ic.second.parent_id);
} if (it != sources.end() && sources.find(ic.second.num) == sources.end())
}
void continue_delete()
{
if (parent->list_first && !lists_done)
{
return;
}
for (int i = 0; i < lists.size(); i++)
{
if (!lists[i]->in_flight && lists[i]->obj_pos == lists[i]->objects.end())
{ {
delete lists[i]; merge_children.push_back(ic.second.num);
lists.erase(lists.begin()+i, lists.begin()+i+1); if (ic.second.readonly || writers_stopped)
i--;
if (!lists_done)
{ {
parent->cli->list_inode_next(lister, 1); inverse_candidates[ic.second.num] = it->second;
} }
} }
else
{
send_ops(lists[i]);
}
}
if (parent->progress && total_count > 0 && total_done*1000/total_count != total_prev_pct)
{
printf("\rRemoved %lu/%lu objects, %lu more PGs to list...", total_done, total_count, pgs_to_list);
total_prev_pct = total_done*1000/total_count;
}
if (lists_done && !lists.size())
{
printf("Done, inode %lu in pool %u data removed\n", INODE_NO_POOL(inode), pool_id);
state = 2;
} }
} }
bool loop() void read_stats()
{ {
if (state == 0) if (inverse_candidates.size() == 0)
{ {
start_delete(); return;
state = 1;
} }
else if (state == 1) json11::Json::array reads;
for (auto cp: inverse_candidates)
{ {
continue_delete(); inode_t inode = cp.first;
reads.push_back(json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
"/"+std::to_string(INODE_NO_POOL(inode))
) },
} }
});
} }
else if (state == 2) for (auto cp: sources)
{ {
return true; inode_t inode = cp.first;
reads.push_back(json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
"/"+std::to_string(INODE_NO_POOL(inode))
) },
} }
});
} }
return false; parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "success", reads },
}, [this](std::string err, json11::Json data)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error reading layer statistics from etcd: %s\n", err.c_str());
exit(1);
}
for (auto inode_result: data["responses"].array_items())
{
auto kv = parent->cli->st_cli.parse_etcd_kv(inode_result["kvs"][0]);
pool_id_t pool_id = 0;
inode_t inode = 0;
char null_byte = 0;
sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.length()+13, "%u/%lu%c", &pool_id, &inode, &null_byte);
if (!inode || null_byte != 0)
{
fprintf(stderr, "Bad key returned from etcd: %s\n", kv.key.c_str());
exit(1);
}
auto pool_cfg_it = parent->cli->st_cli.pool_config.find(pool_id);
if (pool_cfg_it == parent->cli->st_cli.pool_config.end())
{
fprintf(stderr, "Pool %u does not exist\n", pool_id);
exit(1);
}
inode = INODE_WITH_POOL(pool_id, inode);
auto & pool_cfg = pool_cfg_it->second;
uint64_t used_bytes = kv.value["raw_used"].uint64_value() / pool_cfg.pg_size;
if (pool_cfg.scheme != POOL_SCHEME_REPLICATED)
{
used_bytes *= (pool_cfg.pg_size - pool_cfg.parity_chunks);
}
inode_used[inode] = used_bytes;
}
parent->ringloop->wakeup();
});
}
void choose_inverse_candidate()
{
uint64_t max_diff = 0;
for (auto cp: inverse_candidates)
{
inode_t child = cp.first;
uint64_t child_used = inode_used[child];
int rank = cp.second;
for (int i = chain_list.size()-rank; i < chain_list.size(); i++)
{
inode_t parent = chain_list[i];
uint64_t parent_used = inode_used[parent];
if (parent_used > child_used && (!max_diff || max_diff < (parent_used-child_used)))
{
max_diff = (parent_used-child_used);
inverse_parent = parent;
inverse_child = child;
}
}
}
}
void rename_inverse_parent()
{
auto child_it = parent->cli->st_cli.inode_config.find(inverse_child);
if (child_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inverse_child);
exit(1);
}
auto target_it = parent->cli->st_cli.inode_config.find(inverse_parent);
if (target_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inverse_parent);
exit(1);
}
inode_config_t *child_cfg = &child_it->second;
inode_config_t *target_cfg = &target_it->second;
std::string child_name = child_cfg->name;
std::string target_name = target_cfg->name;
std::string child_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(inverse_child))+
"/"+std::to_string(INODE_NO_POOL(inverse_child))
);
std::string target_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(inverse_parent))+
"/"+std::to_string(INODE_NO_POOL(inverse_parent))
);
// Fill new configuration
inode_config_t new_cfg = *child_cfg;
new_cfg.num = target_cfg->num;
new_cfg.parent_id = new_parent;
json11::Json::array cmp = json11::Json::array {
json11::Json::object {
{ "target", "MOD" },
{ "key", child_cfg_key },
{ "result", "LESS" },
{ "mod_revision", child_cfg->mod_revision+1 },
},
json11::Json::object {
{ "target", "MOD" },
{ "key", target_cfg_key },
{ "result", "LESS" },
{ "mod_revision", target_cfg->mod_revision+1 },
},
};
json11::Json::array txn = json11::Json::array {
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", child_cfg_key },
} },
},
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", target_cfg_key },
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&new_cfg)).dump()) },
} },
},
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+child_cfg->name) },
{ "value", base64_encode(json11::Json({
{ "id", INODE_NO_POOL(inverse_parent) },
{ "pool_id", (uint64_t)INODE_POOL(inverse_parent) },
}).dump()) },
} },
},
};
// Reparent children of inverse_child
for (auto & cp: parent->cli->st_cli.inode_config)
{
if (cp.second.parent_id == child_cfg->num)
{
auto cp_cfg = cp.second;
cp_cfg.parent_id = inverse_parent;
auto cp_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(cp.second.num))+
"/"+std::to_string(INODE_NO_POOL(cp.second.num))
);
cmp.push_back(json11::Json::object {
{ "target", "MOD" },
{ "key", cp_key },
{ "result", "LESS" },
{ "mod_revision", cp.second.mod_revision+1 },
});
txn.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", cp_key },
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&cp_cfg)).dump()) },
} },
});
}
}
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "compare", cmp },
{ "success", txn },
}, [this, target_name, child_name](std::string err, json11::Json res)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error renaming %s to %s: %s\n", target_name.c_str(), child_name.c_str(), err.c_str());
exit(1);
}
if (!res["succeeded"].bool_value())
{
fprintf(
stderr, "Parent (%s), child (%s), or one of its children"
" configuration was modified during rename\n", target_name.c_str(), child_name.c_str()
);
exit(1);
}
printf("Layer %s renamed to %s\n", target_name.c_str(), child_name.c_str());
parent->ringloop->wakeup();
});
}
void delete_inode_config(inode_t cur)
{
auto cur_cfg_it = parent->cli->st_cli.inode_config.find(cur);
if (cur_cfg_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode 0x%lx disappeared\n", cur);
exit(1);
}
inode_config_t *cur_cfg = &cur_cfg_it->second;
std::string cur_name = cur_cfg->name;
std::string cur_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(cur))+
"/"+std::to_string(INODE_NO_POOL(cur))
);
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "compare", json11::Json::array {
json11::Json::object {
{ "target", "MOD" },
{ "key", cur_cfg_key },
{ "result", "LESS" },
{ "mod_revision", cur_cfg->mod_revision+1 },
},
} },
{ "success", json11::Json::array {
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", cur_cfg_key },
} },
},
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+cur_name) },
} },
},
} },
}, [this, cur_name](std::string err, json11::Json res)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error deleting %s: %s\n", cur_name.c_str(), err.c_str());
exit(1);
}
if (!res["succeeded"].bool_value())
{
fprintf(stderr, "Layer %s configuration was modified during deletion\n", cur_name.c_str());
exit(1);
}
printf("Layer %s deleted\n", cur_name.c_str());
parent->ringloop->wakeup();
});
}
void start_merge_child(inode_t child_inode, inode_t target_inode)
{
auto child_it = parent->cli->st_cli.inode_config.find(child_inode);
if (child_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", child_inode);
exit(1);
}
auto target_it = parent->cli->st_cli.inode_config.find(target_inode);
if (target_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", target_inode);
exit(1);
}
cb = parent->start_merge(json11::Json::object {
{ "command", json11::Json::array{ "merge-data", from_name, child_it->second.name } },
{ "target", target_it->second.name },
{ "delete-source", false },
{ "cas", use_cas },
{ "fsync-interval", fsync_interval },
});
}
void start_delete_source(inode_t inode)
{
auto source = parent->cli->st_cli.inode_config.find(inode);
if (source == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inode);
exit(1);
}
cb = parent->start_rm(json11::Json::object {
{ "inode", inode },
{ "pool", (uint64_t)INODE_POOL(inode) },
{ "fsync-interval", fsync_interval },
});
} }
}; };
std::function<bool(void)> cli_tool_t::start_rm(json11::Json cfg) std::function<bool(void)> cli_tool_t::start_snap_rm(json11::Json cfg)
{ {
auto remover = new rm_inode_t(); json11::Json::array cmd = cfg["command"].array_items();
remover->parent = this; auto snap_remover = new snap_remover_t();
remover->inode = cfg["inode"].uint64_value(); snap_remover->parent = this;
remover->pool_id = cfg["pool"].uint64_value(); snap_remover->from_name = cmd.size() > 1 ? cmd[1].string_value() : "";
if (remover->pool_id) snap_remover->to_name = cmd.size() > 2 ? cmd[2].string_value() : "";
if (snap_remover->from_name == "")
{ {
remover->inode = (remover->inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (((uint64_t)remover->pool_id) << (64-POOL_ID_BITS)); fprintf(stderr, "Layer to remove argument is missing\n");
}
remover->pool_id = INODE_POOL(remover->inode);
if (!remover->pool_id)
{
fprintf(stderr, "pool is missing\n");
exit(1); exit(1);
} }
remover->min_offset = cfg["min-offset"].uint64_value(); if (snap_remover->to_name == "")
return [remover]()
{ {
if (remover->loop()) snap_remover->to_name = snap_remover->from_name;
}
snap_remover->fsync_interval = cfg["fsync-interval"].uint64_value();
if (!snap_remover->fsync_interval)
snap_remover->fsync_interval = 128;
if (!cfg["cas"].is_null())
snap_remover->use_cas = cfg["cas"].uint64_value() ? 2 : 0;
if (!cfg["writers_stopped"].is_null())
snap_remover->writers_stopped = true;
return [snap_remover]()
{
snap_remover->loop();
if (snap_remover->is_done())
{ {
delete remover; delete snap_remover;
return true; return true;
} }
return false; return false;

214
src/cli_rm_data.cpp Normal file
View File

@@ -0,0 +1,214 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include "cli.h"
#include "cluster_client.h"
#define RM_LISTING 1
#define RM_REMOVING 2
#define RM_END 3
struct rm_pg_t
{
pg_num_t pg_num;
osd_num_t rm_osd_num;
std::set<object_id> objects;
std::set<object_id>::iterator obj_pos;
uint64_t obj_count = 0, obj_done = 0;
int state = 0;
int in_flight = 0;
};
struct rm_inode_t
{
uint64_t inode = 0;
pool_id_t pool_id = 0;
uint64_t min_offset = 0;
cli_tool_t *parent = NULL;
inode_list_t *lister = NULL;
std::vector<rm_pg_t*> lists;
uint64_t total_count = 0, total_done = 0, total_prev_pct = 0;
uint64_t pgs_to_list = 0;
bool lists_done = false;
int state = 0;
void start_delete()
{
lister = parent->cli->list_inode_start(inode, [this](inode_list_t *lst,
std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)
{
rm_pg_t *rm = new rm_pg_t((rm_pg_t){
.pg_num = pg_num,
.rm_osd_num = primary_osd,
.objects = objects,
.obj_count = objects.size(),
.obj_done = 0,
});
if (min_offset == 0)
{
total_count += objects.size();
}
else
{
for (object_id oid: objects)
{
if (oid.stripe >= min_offset)
{
total_count++;
}
}
}
rm->obj_pos = rm->objects.begin();
lists.push_back(rm);
if (parent->list_first)
{
parent->cli->list_inode_next(lister, 1);
}
if (status & INODE_LIST_DONE)
{
lists_done = true;
}
pgs_to_list--;
continue_delete();
});
if (!lister)
{
fprintf(stderr, "Failed to list inode %lu from pool %u objects\n", INODE_NO_POOL(inode), INODE_POOL(inode));
exit(1);
}
pgs_to_list = parent->cli->list_pg_count(lister);
parent->cli->list_inode_next(lister, parent->parallel_osds);
}
void send_ops(rm_pg_t *cur_list)
{
if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) ==
parent->cli->msgr.osd_peer_fds.end())
{
// Initiate connection
parent->cli->msgr.connect_peer(cur_list->rm_osd_num, parent->cli->st_cli.peer_states[cur_list->rm_osd_num]);
return;
}
while (cur_list->in_flight < parent->iodepth && cur_list->obj_pos != cur_list->objects.end())
{
if (cur_list->obj_pos->stripe >= min_offset)
{
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
// Already checked that it exists above, but anyway
op->peer_fd = parent->cli->msgr.osd_peer_fds.at(cur_list->rm_osd_num);
op->req = (osd_any_op_t){
.rw = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = parent->cli->next_op_id(),
.opcode = OSD_OP_DELETE,
},
.inode = cur_list->obj_pos->inode,
.offset = cur_list->obj_pos->stripe,
.len = 0,
},
};
op->callback = [this, cur_list](osd_op_t *op)
{
cur_list->in_flight--;
if (op->reply.hdr.retval < 0)
{
fprintf(stderr, "Failed to remove object %lx:%lx from PG %u (OSD %lu) (retval=%ld)\n",
op->req.rw.inode, op->req.rw.offset,
cur_list->pg_num, cur_list->rm_osd_num, op->reply.hdr.retval);
}
delete op;
cur_list->obj_done++;
total_done++;
continue_delete();
};
cur_list->in_flight++;
parent->cli->msgr.outbox_push(op);
}
cur_list->obj_pos++;
}
}
void continue_delete()
{
if (parent->list_first && !lists_done)
{
return;
}
for (int i = 0; i < lists.size(); i++)
{
if (!lists[i]->in_flight && lists[i]->obj_pos == lists[i]->objects.end())
{
delete lists[i];
lists.erase(lists.begin()+i, lists.begin()+i+1);
i--;
if (!lists_done)
{
parent->cli->list_inode_next(lister, 1);
}
}
else
{
send_ops(lists[i]);
}
}
if (parent->progress && total_count > 0 && total_done*1000/total_count != total_prev_pct)
{
printf("\rRemoved %lu/%lu objects, %lu more PGs to list...", total_done, total_count, pgs_to_list);
total_prev_pct = total_done*1000/total_count;
}
if (lists_done && !lists.size())
{
printf("Done, inode %lu in pool %u data removed\n", INODE_NO_POOL(inode), pool_id);
state = 2;
}
}
bool loop()
{
if (state == 0)
{
start_delete();
state = 1;
}
else if (state == 1)
{
continue_delete();
}
else if (state == 2)
{
return true;
}
return false;
}
};
std::function<bool(void)> cli_tool_t::start_rm(json11::Json cfg)
{
auto remover = new rm_inode_t();
remover->parent = this;
remover->inode = cfg["inode"].uint64_value();
remover->pool_id = cfg["pool"].uint64_value();
if (remover->pool_id)
{
remover->inode = (remover->inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (((uint64_t)remover->pool_id) << (64-POOL_ID_BITS));
}
remover->pool_id = INODE_POOL(remover->inode);
if (!remover->pool_id)
{
fprintf(stderr, "pool is missing\n");
exit(1);
}
remover->min_offset = cfg["min-offset"].uint64_value();
return [remover]()
{
if (remover->loop())
{
delete remover;
return true;
}
return false;
};
}

View File

@@ -1,568 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include <fcntl.h>
#include "cli.h"
#include "cluster_client.h"
#include "base64.h"
// Remove layer(s): similar to merge, but alters metadata and processes multiple merge targets
//
// Exactly one child of the requested layers may be merged using the "inverted" workflow,
// where we merge it "down" into one of the "to-be-removed" layers and then rename the
// "to-be-removed" layer to the child. It may be done either if all writers are stopped
// before trying to delete layers (which is signaled by --writers-stopped) or if that child
// is a read-only layer (snapshot) itself.
//
// This "inverted" workflow trades copying data of one of the deleted layers for copying
// data of one child of the chain which is also a child of the "traded" layer. So we
// choose the (parent,child) pair which has the largest difference between "parent" and
// "child" inode sizes.
//
// All other children of the chain are processed by iterating though them, merging removed
// parents into them and rebasing them to the last layer which isn't a member of the removed
// chain.
//
// Example:
//
// <parent> - <from> - <layer 2> - <to> - <child 1>
// \ \ \- <child 2>
// \ \- <child 3>
// \-<child 4>
//
// 1) Find optimal pair for the "reverse" scenario
// Imagine that it's (<layer 2>, <child 1>) in this example
// 2) Process all children except <child 1>:
// - Merge <from>..<to> to <child 2>
// - Set <child 2> parent to <parent>
// - Repeat for others
// 3) Process <child 1>:
// - Merge <from>..<child 1> to <layer 2>
// - Set <layer 2> parent to <parent>
// - Rename <layer 2> to <child 1>
// 4) Delete other layers of the chain (<from>, <to>)
struct snap_remover_t
{
cli_tool_t *parent;
// remove from..to
std::string from_name, to_name;
// writers are stopped, we can safely change writable layers
bool writers_stopped = false;
// use CAS writes (0 = never, 1 = auto, 2 = always)
int use_cas = 1;
// interval between fsyncs
int fsync_interval = 128;
std::map<inode_t,int> sources;
std::map<inode_t,uint64_t> inode_used;
std::vector<inode_t> merge_children;
std::vector<inode_t> chain_list;
std::map<inode_t,int> inverse_candidates;
inode_t inverse_parent = 0, inverse_child = 0;
inode_t new_parent = 0;
int state = 0;
int current_child = 0;
std::function<bool(void)> cb;
bool is_done()
{
return state == 9;
}
void loop()
{
if (state == 1)
goto resume_1;
else if (state == 2)
goto resume_2;
else if (state == 3)
goto resume_3;
else if (state == 4)
goto resume_4;
else if (state == 5)
goto resume_5;
else if (state == 6)
goto resume_6;
else if (state == 7)
goto resume_7;
else if (state == 8)
goto resume_8;
else if (state == 9)
goto resume_9;
// Get children to merge
get_merge_children();
// Try to select an inode for the "inverse" optimized scenario
// Read statistics from etcd to do it
read_stats();
state = 1;
resume_1:
if (parent->waiting > 0)
return;
choose_inverse_candidate();
// Merge children one by one, except our "inverse" child
for (current_child = 0; current_child < merge_children.size(); current_child++)
{
if (merge_children[current_child] == inverse_child)
continue;
start_merge_child(merge_children[current_child], merge_children[current_child]);
resume_2:
while (!cb())
{
state = 2;
return;
}
cb = NULL;
parent->change_parent(merge_children[current_child], new_parent);
state = 3;
resume_3:
if (parent->waiting > 0)
return;
}
// Merge our "inverse" child into our "inverse" parent
if (inverse_child != 0)
{
start_merge_child(inverse_child, inverse_parent);
resume_4:
while (!cb())
{
state = 4;
return;
}
cb = NULL;
// Delete "inverse" child data
start_delete_source(inverse_child);
resume_5:
while (!cb())
{
state = 5;
return;
}
cb = NULL;
// Delete "inverse" child metadata, rename parent over it,
// and also change parent links of the previous "inverse" child
rename_inverse_parent();
state = 6;
resume_6:
if (parent->waiting > 0)
return;
}
// Delete parents, except the "inverse" one
for (current_child = 0; current_child < chain_list.size(); current_child++)
{
if (chain_list[current_child] == inverse_parent)
continue;
start_delete_source(chain_list[current_child]);
resume_7:
while (!cb())
{
state = 7;
return;
}
cb = NULL;
delete_inode_config(chain_list[current_child]);
state = 8;
resume_8:
if (parent->waiting > 0)
return;
}
state = 9;
resume_9:
// Done
return;
}
void get_merge_children()
{
// Get all children of from..to
inode_config_t *from_cfg = parent->get_inode_cfg(from_name);
inode_config_t *to_cfg = parent->get_inode_cfg(to_name);
// Check that to_cfg is actually a child of from_cfg
// FIXME de-copypaste the following piece of code with snap_merger_t
inode_config_t *cur = to_cfg;
chain_list.push_back(cur->num);
while (cur->num != from_cfg->num && cur->parent_id != 0)
{
auto it = parent->cli->st_cli.inode_config.find(cur->parent_id);
if (it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Parent inode of layer %s (id %ld) not found\n", cur->name.c_str(), cur->parent_id);
exit(1);
}
cur = &it->second;
chain_list.push_back(cur->num);
}
if (cur->num != from_cfg->num)
{
fprintf(stderr, "Layer %s is not a child of %s\n", to_name.c_str(), from_name.c_str());
exit(1);
}
new_parent = from_cfg->parent_id;
// Calculate ranks
int i = chain_list.size()-1;
for (inode_t item: chain_list)
{
sources[item] = i--;
}
for (auto & ic: parent->cli->st_cli.inode_config)
{
if (!ic.second.parent_id)
{
continue;
}
auto it = sources.find(ic.second.parent_id);
if (it != sources.end() && sources.find(ic.second.num) == sources.end())
{
merge_children.push_back(ic.second.num);
if (ic.second.readonly || writers_stopped)
{
inverse_candidates[ic.second.num] = it->second;
}
}
}
}
void read_stats()
{
if (inverse_candidates.size() == 0)
{
return;
}
json11::Json::array reads;
for (auto cp: inverse_candidates)
{
inode_t inode = cp.first;
reads.push_back(json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
"/"+std::to_string(INODE_NO_POOL(inode))
) },
} }
});
}
for (auto cp: sources)
{
inode_t inode = cp.first;
reads.push_back(json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
"/"+std::to_string(INODE_NO_POOL(inode))
) },
} }
});
}
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "success", reads },
}, [this](std::string err, json11::Json data)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error reading layer statistics from etcd: %s\n", err.c_str());
exit(1);
}
for (auto inode_result: data["responses"].array_items())
{
auto kv = parent->cli->st_cli.parse_etcd_kv(inode_result["kvs"][0]);
pool_id_t pool_id = 0;
inode_t inode = 0;
char null_byte = 0;
sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.length()+13, "%u/%lu%c", &pool_id, &inode, &null_byte);
if (!inode || null_byte != 0)
{
fprintf(stderr, "Bad key returned from etcd: %s\n", kv.key.c_str());
exit(1);
}
auto pool_cfg_it = parent->cli->st_cli.pool_config.find(pool_id);
if (pool_cfg_it == parent->cli->st_cli.pool_config.end())
{
fprintf(stderr, "Pool %u does not exist\n", pool_id);
exit(1);
}
inode = INODE_WITH_POOL(pool_id, inode);
auto & pool_cfg = pool_cfg_it->second;
uint64_t used_bytes = kv.value["raw_used"].uint64_value() / pool_cfg.pg_size;
if (pool_cfg.scheme != POOL_SCHEME_REPLICATED)
{
used_bytes *= (pool_cfg.pg_size - pool_cfg.parity_chunks);
}
inode_used[inode] = used_bytes;
}
parent->ringloop->wakeup();
});
}
void choose_inverse_candidate()
{
uint64_t max_diff = 0;
for (auto cp: inverse_candidates)
{
inode_t child = cp.first;
uint64_t child_used = inode_used[child];
int rank = cp.second;
for (int i = chain_list.size()-rank; i < chain_list.size(); i++)
{
inode_t parent = chain_list[i];
uint64_t parent_used = inode_used[parent];
if (parent_used > child_used && (!max_diff || max_diff < (parent_used-child_used)))
{
max_diff = (parent_used-child_used);
inverse_parent = parent;
inverse_child = child;
}
}
}
}
void rename_inverse_parent()
{
auto child_it = parent->cli->st_cli.inode_config.find(inverse_child);
if (child_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inverse_child);
exit(1);
}
auto target_it = parent->cli->st_cli.inode_config.find(inverse_parent);
if (target_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inverse_parent);
exit(1);
}
inode_config_t *child_cfg = &child_it->second;
inode_config_t *target_cfg = &target_it->second;
std::string child_name = child_cfg->name;
std::string target_name = target_cfg->name;
std::string child_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(inverse_child))+
"/"+std::to_string(INODE_NO_POOL(inverse_child))
);
std::string target_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(inverse_parent))+
"/"+std::to_string(INODE_NO_POOL(inverse_parent))
);
// Fill new configuration
inode_config_t new_cfg = *child_cfg;
new_cfg.num = target_cfg->num;
new_cfg.parent_id = new_parent;
json11::Json::array cmp = json11::Json::array {
json11::Json::object {
{ "target", "MOD" },
{ "key", child_cfg_key },
{ "result", "LESS" },
{ "mod_revision", child_cfg->mod_revision+1 },
},
json11::Json::object {
{ "target", "MOD" },
{ "key", target_cfg_key },
{ "result", "LESS" },
{ "mod_revision", target_cfg->mod_revision+1 },
},
};
json11::Json::array txn = json11::Json::array {
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", child_cfg_key },
} },
},
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", target_cfg_key },
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&new_cfg)).dump()) },
} },
},
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+child_cfg->name) },
{ "value", base64_encode(json11::Json({
{ "id", INODE_NO_POOL(inverse_parent) },
{ "pool_id", (uint64_t)INODE_POOL(inverse_parent) },
}).dump()) },
} },
},
};
// Reparent children of inverse_child
for (auto & cp: parent->cli->st_cli.inode_config)
{
if (cp.second.parent_id == child_cfg->num)
{
auto cp_cfg = cp.second;
cp_cfg.parent_id = inverse_parent;
auto cp_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(cp.second.num))+
"/"+std::to_string(INODE_NO_POOL(cp.second.num))
);
cmp.push_back(json11::Json::object {
{ "target", "MOD" },
{ "key", cp_key },
{ "result", "LESS" },
{ "mod_revision", cp.second.mod_revision+1 },
});
txn.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", cp_key },
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&cp_cfg)).dump()) },
} },
});
}
}
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "compare", cmp },
{ "success", txn },
}, [this, target_name, child_name](std::string err, json11::Json res)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error renaming %s to %s: %s\n", target_name.c_str(), child_name.c_str(), err.c_str());
exit(1);
}
if (!res["succeeded"].bool_value())
{
fprintf(
stderr, "Parent (%s), child (%s), or one of its children"
" configuration was modified during rename\n", target_name.c_str(), child_name.c_str()
);
exit(1);
}
printf("Layer %s renamed to %s\n", target_name.c_str(), child_name.c_str());
parent->ringloop->wakeup();
});
}
void delete_inode_config(inode_t cur)
{
auto cur_cfg_it = parent->cli->st_cli.inode_config.find(cur);
if (cur_cfg_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode 0x%lx disappeared\n", cur);
exit(1);
}
inode_config_t *cur_cfg = &cur_cfg_it->second;
std::string cur_name = cur_cfg->name;
std::string cur_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(cur))+
"/"+std::to_string(INODE_NO_POOL(cur))
);
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "compare", json11::Json::array {
json11::Json::object {
{ "target", "MOD" },
{ "key", cur_cfg_key },
{ "result", "LESS" },
{ "mod_revision", cur_cfg->mod_revision+1 },
},
} },
{ "success", json11::Json::array {
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", cur_cfg_key },
} },
},
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+cur_name) },
} },
},
} },
}, [this, cur_name](std::string err, json11::Json res)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error deleting %s: %s\n", cur_name.c_str(), err.c_str());
exit(1);
}
if (!res["succeeded"].bool_value())
{
fprintf(stderr, "Layer %s configuration was modified during deletion\n", cur_name.c_str());
exit(1);
}
printf("Layer %s deleted\n", cur_name.c_str());
parent->ringloop->wakeup();
});
}
void start_merge_child(inode_t child_inode, inode_t target_inode)
{
auto child_it = parent->cli->st_cli.inode_config.find(child_inode);
if (child_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", child_inode);
exit(1);
}
auto target_it = parent->cli->st_cli.inode_config.find(target_inode);
if (target_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", target_inode);
exit(1);
}
cb = parent->start_merge(json11::Json::object {
{ "command", json11::Json::array{ "merge-data", from_name, child_it->second.name } },
{ "target", target_it->second.name },
{ "delete-source", false },
{ "cas", use_cas },
{ "fsync-interval", fsync_interval },
});
}
void start_delete_source(inode_t inode)
{
auto source = parent->cli->st_cli.inode_config.find(inode);
if (source == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inode);
exit(1);
}
cb = parent->start_rm(json11::Json::object {
{ "inode", inode },
{ "pool", (uint64_t)INODE_POOL(inode) },
{ "fsync-interval", fsync_interval },
});
}
};
std::function<bool(void)> cli_tool_t::start_snap_rm(json11::Json cfg)
{
json11::Json::array cmd = cfg["command"].array_items();
auto snap_remover = new snap_remover_t();
snap_remover->parent = this;
snap_remover->from_name = cmd.size() > 1 ? cmd[1].string_value() : "";
snap_remover->to_name = cmd.size() > 2 ? cmd[2].string_value() : "";
if (snap_remover->from_name == "")
{
fprintf(stderr, "Layer to remove argument is missing\n");
exit(1);
}
if (snap_remover->to_name == "")
{
snap_remover->to_name = snap_remover->from_name;
}
snap_remover->fsync_interval = cfg["fsync-interval"].uint64_value();
if (!snap_remover->fsync_interval)
snap_remover->fsync_interval = 128;
if (!cfg["cas"].is_null())
snap_remover->use_cas = cfg["cas"].uint64_value() ? 2 : 0;
if (!cfg["writers_stopped"].is_null())
snap_remover->writers_stopped = true;
return [snap_remover]()
{
snap_remover->loop();
if (snap_remover->is_done())
{
delete snap_remover;
return true;
}
return false;
};
}

View File

@@ -143,7 +143,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op)
} }
else /* if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP) */ else /* if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP) */
{ {
for (auto prev = op->prev; prev; prev = prev->prev) for (auto prev = op_queue_head; prev && prev != op; prev = prev->next)
{ {
if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER) if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER)
{ {
@@ -151,7 +151,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op)
} }
else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ || prev->opcode == OSD_OP_READ_BITMAP) else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ || prev->opcode == OSD_OP_READ_BITMAP)
{ {
// Flushes are always in the beginning // Flushes are always in the beginning (we're scanning from the beginning of the queue)
break; break;
} }
} }
@@ -172,6 +172,7 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n
(next->opcode == OSD_OP_READ || next->opcode == OSD_OP_READ_BITMAP) && (flags & OP_FLUSH_BUFFER)) (next->opcode == OSD_OP_READ || next->opcode == OSD_OP_READ_BITMAP) && (flags & OP_FLUSH_BUFFER))
{ {
next->prev_wait += inc; next->prev_wait += inc;
assert(next->prev_wait >= 0);
if (!next->prev_wait) if (!next->prev_wait)
{ {
if (next->opcode == OSD_OP_SYNC) if (next->opcode == OSD_OP_SYNC)
@@ -191,6 +192,7 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n
if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE) if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE)
{ {
next->prev_wait += inc; next->prev_wait += inc;
assert(next->prev_wait >= 0);
if (!next->prev_wait) if (!next->prev_wait)
{ {
if (next->opcode == OSD_OP_SYNC) if (next->opcode == OSD_OP_SYNC)

View File

@@ -200,7 +200,8 @@ void cluster_client_t::send_list(inode_list_osd_t *cur_list)
auto & pool_cfg = st_cli.pool_config[cur_list->pg->lst->pool_id]; auto & pool_cfg = st_cli.pool_config[cur_list->pg->lst->pool_id];
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->peer_fd = msgr.osd_peer_fds[cur_list->osd_num]; // Already checked that it exists above, but anyway
op->peer_fd = msgr.osd_peer_fds.at(cur_list->osd_num);
op->req = (osd_any_op_t){ op->req = (osd_any_op_t){
.sec_list = { .sec_list = {
.header = { .header = {

View File

@@ -351,9 +351,9 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
} }
else else
{ {
printf("+++ %s 0x%lx 0x%llx+%llx\n", printf("+++ %s 0x%lx 0x%llx+%lx\n",
io->ddir == DDIR_READ ? "READ" : "WRITE", io->ddir == DDIR_READ ? "READ" : "WRITE",
(uint64_t)io, io->offset, io->xfer_buflen); (uint64_t)io, io->offset, (uint64_t)io->xfer_buflen);
} }
} }

View File

@@ -170,14 +170,14 @@ static int sec_init(struct thread_data *td)
bsd->block_order = o->block_order == 0 ? 17 : o->block_order; bsd->block_order = o->block_order == 0 ? 17 : o->block_order;
bsd->block_size = 1 << o->block_order; bsd->block_size = 1 << o->block_order;
sockaddr addr; sockaddr_storage addr;
if (!string_to_addr(std::string(o->host ? o->host : "127.0.0.1"), false, o->port > 0 ? o->port : 11203, &addr)) if (!string_to_addr(std::string(o->host ? o->host : "127.0.0.1"), false, o->port > 0 ? o->port : 11203, &addr))
{ {
fprintf(stderr, "server address: %s is not valid\n", o->host ? o->host : "127.0.0.1"); fprintf(stderr, "server address: %s is not valid\n", o->host ? o->host : "127.0.0.1");
return 1; return 1;
} }
bsd->connect_fd = socket(addr.sa_family, SOCK_STREAM, 0); bsd->connect_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (bsd->connect_fd < 0) if (bsd->connect_fd < 0)
{ {
perror("socket"); perror("socket");
@@ -355,7 +355,7 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
{ {
if (reply.hdr.retval != io->xfer_buflen) if (reply.hdr.retval != io->xfer_buflen)
{ {
fprintf(stderr, "Short read: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); fprintf(stderr, "Short read: retval = %ld instead of %lu\n", reply.hdr.retval, (uint64_t)io->xfer_buflen);
exit(1); exit(1);
} }
// Support bitmap // Support bitmap
@@ -380,7 +380,7 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
{ {
if (reply.hdr.retval != io->xfer_buflen) if (reply.hdr.retval != io->xfer_buflen)
{ {
fprintf(stderr, "Short write: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); fprintf(stderr, "Short write: retval = %ld instead of %lu\n", reply.hdr.retval, (uint64_t)io->xfer_buflen);
exit(1); exit(1);
} }
} }

View File

@@ -62,9 +62,10 @@ struct http_co_t
void run_cb_and_clear(); void run_cb_and_clear();
void start_connection(); void start_connection();
void close_connection(); void close_connection();
void next_request();
void handle_events(); void handle_events();
void handle_connect_result(); void handle_connect_result();
void submit_read(); void submit_read(bool check_timeout);
void submit_send(); void submit_send();
bool handle_read(); bool handle_read();
void post_message(int type, const std::string & msg); void post_message(int type, const std::string & msg);
@@ -128,6 +129,7 @@ void http_co_t::run_cb_and_clear()
// Call callback after clearing it because otherwise we may hit reenterability problems // Call callback after clearing it because otherwise we may hit reenterability problems
if (cb != NULL) if (cb != NULL)
cb(&parsed); cb(&parsed);
next_request();
} }
void http_co_t::send_request(const std::string & host, const std::string & request, void http_co_t::send_request(const std::string & host, const std::string & request,
@@ -161,17 +163,6 @@ void http_co_t::send_request(const std::string & host, const std::string & reque
this->sent = 0; this->sent = 0;
this->response_callback = response_callback; this->response_callback = response_callback;
this->parsed = {}; this->parsed = {};
if (request_timeout > 0)
{
timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id)
{
stackin();
close_connection();
parsed = { .error = "HTTP request timed out" };
run_cb_and_clear();
stackout();
});
}
if (state == HTTP_CO_KEEPALIVE) if (state == HTTP_CO_KEEPALIVE)
{ {
state = HTTP_CO_SENDING_REQUEST; state = HTTP_CO_SENDING_REQUEST;
@@ -181,6 +172,28 @@ void http_co_t::send_request(const std::string & host, const std::string & reque
{ {
start_connection(); start_connection();
} }
// Do it _after_ state assignment because set_timer() can actually trigger
// other timers and requests (reenterability is our friend)
if (request_timeout > 0)
{
timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id)
{
stackin();
if (state == HTTP_CO_REQUEST_SENT)
{
// In case of high CPU load, we may not handle etcd responses in time
// For this case, first check the socket and only then terminate request with the timeout
submit_read(true);
}
else
{
close_connection();
parsed = { .error = "HTTP request timed out" };
run_cb_and_clear();
}
stackout();
});
}
stackout(); stackout();
} }
@@ -271,17 +284,19 @@ void http_co_t::close_connection()
void http_co_t::start_connection() void http_co_t::start_connection()
{ {
stackin(); stackin();
struct sockaddr addr; struct sockaddr_storage addr;
if (!string_to_addr(host.c_str(), 1, 80, &addr)) if (!string_to_addr(host.c_str(), 1, 80, &addr))
{ {
close_connection();
parsed = { .error = "Invalid address: "+host }; parsed = { .error = "Invalid address: "+host };
run_cb_and_clear(); run_cb_and_clear();
stackout(); stackout();
return; return;
} }
peer_fd = socket(addr.sa_family, SOCK_STREAM, 0); peer_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (peer_fd < 0) if (peer_fd < 0)
{ {
close_connection();
parsed = { .error = std::string("socket: ")+strerror(errno) }; parsed = { .error = std::string("socket: ")+strerror(errno) };
run_cb_and_clear(); run_cb_and_clear();
stackout(); stackout();
@@ -323,7 +338,7 @@ void http_co_t::handle_events()
epoll_events &= ~EPOLLOUT; epoll_events &= ~EPOLLOUT;
if (epoll_events & EPOLLIN) if (epoll_events & EPOLLIN)
{ {
submit_read(); submit_read(false);
} }
else if (epoll_events & (EPOLLRDHUP|EPOLLERR)) else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
{ {
@@ -410,10 +425,11 @@ again:
stackout(); stackout();
} }
void http_co_t::submit_read() void http_co_t::submit_read(bool check_timeout)
{ {
stackin(); stackin();
int res; int res;
again:
if (rbuf.size() != READ_BUFFER_SIZE) if (rbuf.size() != READ_BUFFER_SIZE)
{ {
rbuf.resize(READ_BUFFER_SIZE); rbuf.resize(READ_BUFFER_SIZE);
@@ -428,7 +444,22 @@ void http_co_t::submit_read()
} }
if (res == -EAGAIN || res == -EINTR) if (res == -EAGAIN || res == -EINTR)
{ {
epoll_events = epoll_events & ~EPOLLIN; if (check_timeout)
{
if (res == -EINTR)
goto again;
else
{
// Timeout happened and there is no data to read
close_connection();
parsed = { .error = "HTTP request timed out" };
run_cb_and_clear();
}
}
else
{
epoll_events = epoll_events & ~EPOLLIN;
}
} }
else if (res <= 0) else if (res <= 0)
{ {
@@ -501,8 +532,11 @@ bool http_co_t::handle_read()
if (state == HTTP_CO_HEADERS_RECEIVED && target_response_size > 0 && response.size() >= target_response_size) if (state == HTTP_CO_HEADERS_RECEIVED && target_response_size > 0 && response.size() >= target_response_size)
{ {
std::swap(parsed.body, response); std::swap(parsed.body, response);
response_callback(&parsed); if (!keepalive)
parsed.eof = true; close_connection();
else
state = HTTP_CO_KEEPALIVE;
run_cb_and_clear();
} }
else if (state == HTTP_CO_CHUNKED && response.size() > 0) else if (state == HTTP_CO_CHUNKED && response.size() > 0)
{ {
@@ -533,10 +567,14 @@ bool http_co_t::handle_read()
response_callback(&parsed); response_callback(&parsed);
parsed.body = ""; parsed.body = "";
} }
if (parsed.eof && !want_streaming) else if (parsed.eof)
{ {
// Normal response // Normal response
response_callback(&parsed); if (!keepalive)
close_connection();
else
state = HTTP_CO_KEEPALIVE;
run_cb_and_clear();
} }
} }
else if (state == HTTP_CO_WEBSOCKET && response.size() > 0) else if (state == HTTP_CO_WEBSOCKET && response.size() > 0)
@@ -547,29 +585,20 @@ bool http_co_t::handle_read()
parsed.body = ""; parsed.body = "";
} }
} }
if (parsed.eof)
{
response_callback = NULL;
parsed = {};
if (!keepalive)
{
close_connection();
}
else
{
state = HTTP_CO_KEEPALIVE;
if (keepalive_queue.size() > 0)
{
auto next = keepalive_queue[0];
keepalive_queue.erase(keepalive_queue.begin(), keepalive_queue.begin()+1);
next();
}
}
}
stackout(); stackout();
return true; return true;
} }
void http_co_t::next_request()
{
if (keepalive_queue.size() > 0)
{
auto next = keepalive_queue[0];
keepalive_queue.erase(keepalive_queue.begin(), keepalive_queue.begin()+1);
next();
}
}
uint64_t stoull_full(const std::string & str, int base) uint64_t stoull_full(const std::string & str, int base)
{ {
if (isspace(str[0])) if (isspace(str[0]))

View File

@@ -222,13 +222,13 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port) void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
{ {
assert(peer_osd != this->osd_num); assert(peer_osd != this->osd_num);
struct sockaddr addr; struct sockaddr_storage addr;
if (!string_to_addr(peer_host, 0, peer_port, &addr)) if (!string_to_addr(peer_host, 0, peer_port, &addr))
{ {
on_connect_peer(peer_osd, -EINVAL); on_connect_peer(peer_osd, -EINVAL);
return; return;
} }
int peer_fd = socket(addr.sa_family, SOCK_STREAM, 0); int peer_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (peer_fd < 0) if (peer_fd < 0)
{ {
on_connect_peer(peer_osd, -errno); on_connect_peer(peer_osd, -errno);
@@ -484,10 +484,10 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
void osd_messenger_t::accept_connections(int listen_fd) void osd_messenger_t::accept_connections(int listen_fd)
{ {
// Accept new connections // Accept new connections
sockaddr addr; sockaddr_storage addr;
socklen_t peer_addr_size = sizeof(addr); socklen_t peer_addr_size = sizeof(addr);
int peer_fd; int peer_fd;
while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0) while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
{ {
assert(peer_fd != 0); assert(peer_fd != 0);
fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd, fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd,

View File

@@ -49,7 +49,7 @@ struct osd_client_t
{ {
int refs = 0; int refs = 0;
sockaddr peer_addr; sockaddr_storage peer_addr;
int peer_port; int peer_port;
int peer_fd; int peer_fd;
int peer_state; int peer_state;

View File

@@ -3,7 +3,6 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/mman.h>
#include "msgr_rdma.h" #include "msgr_rdma.h"
#include "messenger.h" #include "messenger.h"
@@ -55,7 +54,6 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level) msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
{ {
int res; int res;
bool odp = true;
ibv_device **dev_list = NULL; ibv_device **dev_list = NULL;
msgr_rdma_context_t *ctx = new msgr_rdma_context_t(); msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
ctx->mtu = mtu; ctx->mtu = mtu;
@@ -119,9 +117,9 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev)); fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev));
goto cleanup; goto cleanup;
} }
if ((res = ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid)) != 0) if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
{ {
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(ctx->dev), gid_index, strerror(res)); fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index);
goto cleanup; goto cleanup;
} }
@@ -133,9 +131,9 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
} }
{ {
if ((res = ibv_query_device_ex(ctx->context, NULL, &ctx->attrx)) != 0) if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
{ {
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(ctx->dev), strerror(res)); fprintf(stderr, "Couldn't query RDMA device for its features\n");
goto cleanup; goto cleanup;
} }
if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) || if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
@@ -143,20 +141,15 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)) !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{ {
fprintf(stderr, "Warning: RDMA device isn't implicit ODP (On-Demand Paging) capable, trying to lock all application memory\n"); fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n");
if (mlockall(MCL_CURRENT|MCL_FUTURE|MCL_ONFAULT) != 0) goto cleanup;
{
fprintf(stderr, "mlockall() failed: %s\n", strerror(errno));
goto cleanup;
}
odp = false;
} }
} }
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | (odp ? IBV_ACCESS_ON_DEMAND : 0)); ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr) if (!ctx->mr)
{ {
fprintf(stderr, "Couldn't register RDMA memory region: %s\n", strerror(errno)); fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup; goto cleanup;
} }

View File

@@ -363,7 +363,8 @@ public:
setsid(); setsid();
if (fork()) if (fork())
exit(0); exit(0);
chdir("/"); if (chdir("/") != 0)
fprintf(stderr, "Warning: Failed to chdir into /\n");
close(0); close(0);
close(1); close(1);
close(2); close(2);
@@ -525,7 +526,11 @@ protected:
{ {
goto end_unmap; goto end_unmap;
} }
write(qd_fd, "32768", 5); r = write(qd_fd, "32768", 5);
if (r != 5)
{
fprintf(stderr, "Warning: Failed to configure max_sectors_kb\n");
}
close(qd_fd); close(qd_fd);
if (!fork()) if (!fork())
{ {

View File

@@ -57,7 +57,11 @@ osd_t::osd_t(const json11::Json & config, ring_loop_t *ringloop)
if (this->config["osd_memlock"] == "true" || this->config["osd_memlock"] == "1" || this->config["osd_memlock"] == "yes") if (this->config["osd_memlock"] == "true" || this->config["osd_memlock"] == "1" || this->config["osd_memlock"] == "yes")
{ {
// Lock all OSD memory if requested // Lock all OSD memory if requested
if (mlockall(MCL_CURRENT|MCL_FUTURE|MCL_ONFAULT) != 0) if (mlockall(MCL_CURRENT|MCL_FUTURE
#ifdef MCL_ONFAULT
| MCL_ONFAULT
#endif
) != 0)
{ {
fprintf(stderr, "osd_memlock is set to true, but mlockall() failed: %s\n", strerror(errno)); fprintf(stderr, "osd_memlock is set to true, but mlockall() failed: %s\n", strerror(errno));
exit(-1); exit(-1);
@@ -196,46 +200,7 @@ void osd_t::bind_socket()
// FIXME Support multiple listening sockets // FIXME Support multiple listening sockets
sockaddr addr; listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (bind_port == 0)
{
socklen_t len = sizeof(addr);
if (getsockname(listen_fd, (sockaddr *)&addr, &len) == -1)
{
close(listen_fd);
throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
}
listening_port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else
{
listening_port = bind_port;
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events) epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)

View File

@@ -211,7 +211,7 @@ class osd_t
// flushing, recovery and backfill // flushing, recovery and backfill
void submit_pg_flush_ops(pg_t & pg); void submit_pg_flush_ops(pg_t & pg);
void handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval); void handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval);
void submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data); bool submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data);
bool pick_next_recovery(osd_recovery_op_t &op); bool pick_next_recovery(osd_recovery_op_t &op);
void submit_recovery_op(osd_recovery_op_t *op); void submit_recovery_op(osd_recovery_op_t *op);
bool continue_recovery(); bool continue_recovery();

View File

@@ -457,7 +457,8 @@ void osd_t::renew_lease()
if (err == "" && data["result"]["TTL"].string_value() == "") if (err == "" && data["result"]["TTL"].string_value() == "")
{ {
// Die // Die
throw std::runtime_error("etcd lease has expired"); fprintf(stderr, "Error refreshing etcd lease\n");
force_stop(1);
} }
if (err != "") if (err != "")
{ {
@@ -466,7 +467,8 @@ void osd_t::renew_lease()
if (etcd_failed_attempts > st_cli.max_etcd_attempts) if (etcd_failed_attempts > st_cli.max_etcd_attempts)
{ {
// Die // Die
throw std::runtime_error("Cluster connection failed"); fprintf(stderr, "Cluster connection failed\n");
force_stop(1);
} }
// Retry // Retry
tfd->set_timer(st_cli.etcd_quick_timeout, false, [this](int timer_id) tfd->set_timer(st_cli.etcd_quick_timeout, false, [this](int timer_id)

View File

@@ -47,7 +47,8 @@ void osd_t::submit_pg_flush_ops(pg_t & pg)
if (l.second.size() > 0) if (l.second.size() > 0)
{ {
fb->flush_ops++; fb->flush_ops++;
submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()); if (!submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()))
return;
} }
} }
for (auto & l: fb->stable_lists) for (auto & l: fb->stable_lists)
@@ -55,7 +56,8 @@ void osd_t::submit_pg_flush_ops(pg_t & pg)
if (l.second.size() > 0) if (l.second.size() > 0)
{ {
fb->flush_ops++; fb->flush_ops++;
submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()); if (!submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()))
return;
} }
} }
} }
@@ -160,7 +162,7 @@ void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, p
} }
} }
void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data) bool osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data)
{ {
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
// Copy buffer so it gets freed along with the operation // Copy buffer so it gets freed along with the operation
@@ -188,10 +190,8 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
else else
{ {
// Peer // Peer
int peer_fd = msgr.osd_peer_fds[peer_osd];
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->iov.push_back(op->buf, count * sizeof(obj_ver_id)); op->iov.push_back(op->buf, count * sizeof(obj_ver_id));
op->peer_fd = peer_fd;
op->req = (osd_any_op_t){ op->req = (osd_any_op_t){
.sec_stab = { .sec_stab = {
.header = { .header = {
@@ -207,8 +207,21 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval); handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval);
delete op; delete op;
}; };
msgr.outbox_push(op); auto peer_fd_it = msgr.osd_peer_fds.find(peer_osd);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
op->peer_fd = peer_fd_it->second;
msgr.outbox_push(op);
}
else
{
// Fail it immediately
op->reply.hdr.retval = -EPIPE;
op->callback(op);
return false;
}
} }
return true;
} }
bool osd_t::pick_next_recovery(osd_recovery_op_t &op) bool osd_t::pick_next_recovery(osd_recovery_op_t &op)

View File

@@ -29,8 +29,10 @@ void osd_t::handle_peers()
degraded_objects += p.second.degraded_objects.size(); degraded_objects += p.second.degraded_objects.size();
if (p.second.state & PG_HAS_UNCLEAN) if (p.second.state & PG_HAS_UNCLEAN)
peering_state = peering_state | OSD_FLUSHING_PGS; peering_state = peering_state | OSD_FLUSHING_PGS;
else if (p.second.state & PG_HAS_DEGRADED) else if (p.second.state & (PG_HAS_DEGRADED | PG_HAS_MISPLACED))
peering_state = peering_state | OSD_RECOVERING; peering_state = peering_state | OSD_RECOVERING;
ringloop->wakeup();
return;
} }
else else
{ {
@@ -340,7 +342,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
else else
{ {
// Peer // Peer
auto & cl = msgr.clients.at(msgr.osd_peer_fds[role_osd]); auto & cl = msgr.clients.at(msgr.osd_peer_fds.at(role_osd));
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->peer_fd = cl->peer_fd; op->peer_fd = cl->peer_fd;
@@ -394,7 +396,9 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
{ {
if (op->bs_op->retval < 0) if (op->bs_op->retval < 0)
{ {
throw std::runtime_error("local OP_LIST failed"); printf("Local OP_LIST failed: retval=%d\n", op->bs_op->retval);
force_stop(1);
return;
} }
add_bs_subop_stats(op); add_bs_subop_stats(op);
printf( printf(
@@ -419,7 +423,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
// Peer // Peer
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->peer_fd = msgr.osd_peer_fds[role_osd]; op->peer_fd = msgr.osd_peer_fds.at(role_osd);
op->req = (osd_any_op_t){ op->req = (osd_any_op_t){
.sec_list = { .sec_list = {
.header = { .header = {

View File

@@ -246,7 +246,6 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
// Send to a remote OSD // Send to a remote OSD
osd_op_t *subop = op_data->subops+subop_idx; osd_op_t *subop = op_data->subops+subop_idx;
subop->op_type = OSD_OP_OUT; subop->op_type = OSD_OP_OUT;
subop->peer_fd = msgr.osd_peer_fds.at(subop_osd_num);
// FIXME: Use the pre-allocated buffer // FIXME: Use the pre-allocated buffer
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev)); subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
subop->req = (osd_any_op_t){ subop->req = (osd_any_op_t){
@@ -287,7 +286,18 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
} }
handle_primary_subop(subop, cur_op); handle_primary_subop(subop, cur_op);
}; };
msgr.outbox_push(subop); auto peer_fd_it = msgr.osd_peer_fds.find(subop_osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subop->peer_fd = peer_fd_it->second;
msgr.outbox_push(subop);
}
else
{
// Fail it immediately
subop->reply.hdr.retval = -EPIPE;
subop->callback(subop);
}
subop_idx++; subop_idx++;
} }
prev = i+1; prev = i+1;

View File

@@ -182,7 +182,6 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
else else
{ {
subop->op_type = OSD_OP_OUT; subop->op_type = OSD_OP_OUT;
subop->peer_fd = msgr.osd_peer_fds.at(role_osd_num);
subop->bitmap = stripes[stripe_num].bmp_buf; subop->bitmap = stripes[stripe_num].bmp_buf;
subop->bitmap_len = clean_entry_bitmap_size; subop->bitmap_len = clean_entry_bitmap_size;
subop->req.sec_rw = { subop->req.sec_rw = {
@@ -225,7 +224,18 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
{ {
handle_primary_subop(subop, cur_op); handle_primary_subop(subop, cur_op);
}; };
msgr.outbox_push(subop); auto peer_fd_it = msgr.osd_peer_fds.find(role_osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subop->peer_fd = peer_fd_it->second;
msgr.outbox_push(subop);
}
else
{
// Fail it immediately
subop->reply.hdr.retval = -EPIPE;
subop->callback(subop);
}
} }
i++; i++;
} }
@@ -463,7 +473,6 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
else else
{ {
subops[i].op_type = OSD_OP_OUT; subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = msgr.osd_peer_fds.at(chunk.osd_num);
subops[i].req = (osd_any_op_t){ .sec_del = { subops[i].req = (osd_any_op_t){ .sec_del = {
.header = { .header = {
.magic = SECONDARY_OSD_OP_MAGIC, .magic = SECONDARY_OSD_OP_MAGIC,
@@ -477,7 +486,18 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
{ {
handle_primary_subop(subop, cur_op); handle_primary_subop(subop, cur_op);
}; };
msgr.outbox_push(&subops[i]); auto peer_fd_it = msgr.osd_peer_fds.find(chunk.osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subops[i].peer_fd = peer_fd_it->second;
msgr.outbox_push(&subops[i]);
}
else
{
// Fail it immediately
subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]);
}
} }
} }
} }
@@ -567,7 +587,6 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
else else
{ {
subops[i].op_type = OSD_OP_OUT; subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = msgr.osd_peer_fds.at(stab_osd.osd_num);
subops[i].req = (osd_any_op_t){ .sec_stab = { subops[i].req = (osd_any_op_t){ .sec_stab = {
.header = { .header = {
.magic = SECONDARY_OSD_OP_MAGIC, .magic = SECONDARY_OSD_OP_MAGIC,
@@ -581,7 +600,18 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
{ {
handle_primary_subop(subop, cur_op); handle_primary_subop(subop, cur_op);
}; };
msgr.outbox_push(&subops[i]); auto peer_fd_it = msgr.osd_peer_fds.find(stab_osd.osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subops[i].peer_fd = peer_fd_it->second;
msgr.outbox_push(&subops[i]);
}
else
{
// Fail it immediately
subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]);
}
} }
} }
} }

View File

@@ -8,7 +8,7 @@
#include "osd_id.h" #include "osd_id.h"
#ifndef MEM_ALIGNMENT #ifndef MEM_ALIGNMENT
#define MEM_ALIGNMENT 512 #define MEM_ALIGNMENT 4096
#endif #endif
struct buf_len_t struct buf_len_t

View File

@@ -134,14 +134,14 @@ int main(int narg, char *args[])
int connect_osd(const char *osd_address, int osd_port) int connect_osd(const char *osd_address, int osd_port)
{ {
struct sockaddr addr; struct sockaddr_storage addr;
if (!string_to_addr(osd_address, 0, osd_port, &addr)) if (!string_to_addr(osd_address, 0, osd_port, &addr))
{ {
fprintf(stderr, "server address: %s is not valid\n", osd_address); fprintf(stderr, "server address: %s is not valid\n", osd_address);
return -1; return -1;
} }
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0); int connect_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (connect_fd < 0) if (connect_fd < 0)
{ {
perror("socket"); perror("socket");

View File

@@ -67,14 +67,14 @@ int main(int narg, char *args[])
int connect_stub(const char *server_address, int server_port) int connect_stub(const char *server_address, int server_port)
{ {
struct sockaddr addr; struct sockaddr_storage addr;
if (!string_to_addr(server_address, 0, server_port, &addr)) if (!string_to_addr(server_address, 0, server_port, &addr))
{ {
fprintf(stderr, "server address: %s is not valid\n", server_address); fprintf(stderr, "server address: %s is not valid\n", server_address);
return -1; return -1;
} }
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0); int connect_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (connect_fd < 0) if (connect_fd < 0)
{ {
perror("socket"); perror("socket");

View File

@@ -41,21 +41,19 @@
#include "rw_blocking.h" #include "rw_blocking.h"
#include "osd_ops.h" #include "osd_ops.h"
int bind_stub(std::string bind_address, int bind_port);
void run_stub(int peer_fd); void run_stub(int peer_fd);
int main(int narg, char *args[]) int main(int narg, char *args[])
{ {
int listen_fd = bind_stub("0.0.0.0", 11203); int listen_fd = create_and_bind_socket("0.0.0.0", 11203, 128, NULL);
// Accept new connections // Accept new connections
sockaddr addr; sockaddr_storage addr;
socklen_t peer_addr_size = sizeof(addr); socklen_t peer_addr_size = sizeof(addr);
int peer_fd; int peer_fd;
while (1) while (1)
{ {
printf("stub_osd: waiting for 1 client\n"); printf("stub_osd: waiting for 1 client\n");
peer_fd = accept(listen_fd, &addr, &peer_addr_size); peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size);
if (peer_fd == -1) if (peer_fd == -1)
{ {
if (errno == EAGAIN) if (errno == EAGAIN)
@@ -76,39 +74,6 @@ int main(int narg, char *args[])
return 0; return 0;
} }
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
return listen_fd;
}
void run_stub(int peer_fd) void run_stub(int peer_fd)
{ {
osd_any_op_t op; osd_any_op_t op;

View File

@@ -25,8 +25,6 @@
#include "epoll_manager.h" #include "epoll_manager.h"
#include "messenger.h" #include "messenger.h"
int bind_stub(std::string bind_address, int bind_port);
void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op); void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op);
int main(int narg, char *args[]) int main(int narg, char *args[])
@@ -43,7 +41,8 @@ int main(int narg, char *args[])
json11::Json config = json11::Json::object { { "log_level", 1 } }; json11::Json config = json11::Json::object { { "log_level", 1 } };
msgr->parse_config(config); msgr->parse_config(config);
// Accept new connections // Accept new connections
int listen_fd = bind_stub("0.0.0.0", 11203); int listen_fd = create_and_bind_socket("0.0.0.0", 11203, 128, NULL);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [listen_fd, msgr](int fd, int events) epmgr->set_fd_handler(listen_fd, false, [listen_fd, msgr](int fd, int events)
{ {
msgr->accept_connections(listen_fd); msgr->accept_connections(listen_fd);
@@ -67,41 +66,6 @@ int main(int narg, char *args[])
return 0; return 0;
} }
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
return listen_fd;
}
void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op) void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op)
{ {
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;

View File

@@ -6,7 +6,7 @@ includedir=${prefix}/@CMAKE_INSTALL_INCLUDEDIR@
Name: Vitastor Name: Vitastor
Description: Vitastor client library Description: Vitastor client library
Version: 0.6.12 Version: 0.6.15
Libs: -L${libdir} -lvitastor_client Libs: -L${libdir} -lvitastor_client
Cflags: -I${includedir} Cflags: -I${includedir}