Compare commits

..

13 Commits

Author SHA1 Message Date
117d6f0612 Release 0.6.14
- Fix IPv6 address parsing
- Fix "cannot read bytes of undefined" in the monitor on a fresh DB
- Fix possible hangs of read requests on OSD restarts without immediate_commit=all mode
- Fix OSDs skipping misplaced recovery in some cases
- Fix OSDs possibly dying with "map::at" errors when other OSDs are stopped
- Fix division by zero in ls if all pool OSDs are down
2022-02-17 14:43:44 +03:00
7d79c58095 Use the larger sockaddr_storage structure 2022-02-12 11:22:56 +03:00
46d2bc100f Add some tolerance to stat calculation so it does not fail on a fresh DB 2022-02-11 16:37:16 +03:00
732e2804e9 Fix operation dependency counter underflow for reads without immediate_commit=all mode 2022-02-11 10:54:11 +03:00
abaec2008c Fix OSDs missing misplaced recovery 2022-02-11 01:00:24 +03:00
8129d238a4 Different fio versions have different types for xfer_buflen, but Vitastor anyway does not support 128-bit offsets 2022-02-10 01:21:04 +03:00
61ebed144a Fix OSDs possibly dying with "map::at" errors when other OSDs are stopped 2022-02-09 10:35:29 +03:00
9d3ba113aa Extract bind socket code into a utility function 2022-02-06 00:39:52 +03:00
9788045dc9 Fix division by zero in ls if all pool OSDs are down 2022-02-05 17:03:37 +03:00
d6b0d29af6 4k MEM_ALIGNMENT 2022-02-05 17:03:37 +03:00
36f352f06f Release 0.6.13
- Fix client hangs possible on OSD restarts (bug affected versions from 0.5.11)
- Fix "Assertion `sqe != NULL' failed" io_uring-related crashes possible
  on some kernels (0.6.11 increased probability of this bug)
- Fix timeout=0 in NBD proxy
- Fix build under centos 7
2022-02-03 01:50:30 +03:00
318cc463c2 Fix warnings 2022-02-03 01:50:30 +03:00
145e5cfb86 MCL_ONFAULT is not available under centos 7 2022-02-03 01:42:19 +03:00
42 changed files with 234 additions and 234 deletions

View File

@@ -2,6 +2,6 @@ cmake_minimum_required(VERSION 2.8)
project(vitastor) project(vitastor)
set(VERSION "0.6.12") set(VERSION "0.6.14")
add_subdirectory(src) add_subdirectory(src)

View File

@@ -1,4 +1,4 @@
VERSION ?= v0.6.12 VERSION ?= v0.6.14
all: build push all: build push

View File

@@ -49,7 +49,7 @@ spec:
capabilities: capabilities:
add: ["SYS_ADMIN"] add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true allowPrivilegeEscalation: true
image: vitalif/vitastor-csi:v0.6.12 image: vitalif/vitastor-csi:v0.6.14
args: args:
- "--node=$(NODE_ID)" - "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)" - "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -116,7 +116,7 @@ spec:
privileged: true privileged: true
capabilities: capabilities:
add: ["SYS_ADMIN"] add: ["SYS_ADMIN"]
image: vitalif/vitastor-csi:v0.6.12 image: vitalif/vitastor-csi:v0.6.14
args: args:
- "--node=$(NODE_ID)" - "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)" - "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -5,7 +5,7 @@ package vitastor
const ( const (
vitastorCSIDriverName = "csi.vitastor.io" vitastorCSIDriverName = "csi.vitastor.io"
vitastorCSIDriverVersion = "0.6.12" vitastorCSIDriverVersion = "0.6.14"
) )
// Config struct fills the parameters of request or user input // Config struct fills the parameters of request or user input

2
debian/changelog vendored
View File

@@ -1,4 +1,4 @@
vitastor (0.6.12-1) unstable; urgency=medium vitastor (0.6.14-1) unstable; urgency=medium
* RDMA support * RDMA support
* Bugfixes * Bugfixes

View File

@@ -33,8 +33,8 @@ RUN set -e -x; \
mkdir -p /root/packages/vitastor-$REL; \ mkdir -p /root/packages/vitastor-$REL; \
rm -rf /root/packages/vitastor-$REL/*; \ rm -rf /root/packages/vitastor-$REL/*; \
cd /root/packages/vitastor-$REL; \ cd /root/packages/vitastor-$REL; \
cp -r /root/vitastor vitastor-0.6.12; \ cp -r /root/vitastor vitastor-0.6.14; \
cd vitastor-0.6.12; \ cd vitastor-0.6.14; \
ln -s /root/fio-build/fio-*/ ./fio; \ ln -s /root/fio-build/fio-*/ ./fio; \
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
ls /usr/include/linux/raw.h || cp ./debian/raw.h /usr/include/linux/raw.h; \ ls /usr/include/linux/raw.h || cp ./debian/raw.h /usr/include/linux/raw.h; \
@@ -47,8 +47,8 @@ RUN set -e -x; \
rm -rf a b; \ rm -rf a b; \
echo "dep:fio=$FIO" > debian/fio_version; \ echo "dep:fio=$FIO" > debian/fio_version; \
cd /root/packages/vitastor-$REL; \ cd /root/packages/vitastor-$REL; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.12.orig.tar.xz vitastor-0.6.12; \ tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.14.orig.tar.xz vitastor-0.6.14; \
cd vitastor-0.6.12; \ cd vitastor-0.6.14; \
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \ V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \ DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \ DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \

View File

@@ -48,28 +48,19 @@
type: string type: string
info: | info: |
RDMA device name to use for Vitastor OSD communications (for example, RDMA device name to use for Vitastor OSD communications (for example,
"rocep5s0f0"). Please note that if your RDMA device doesn't support "rocep5s0f0"). Please note that Vitastor RDMA requires Implicit On-Demand
Implicit ODP (Implicit On-Demand Paging) then all Vitastor OSDs and clients Paging (Implicit ODP) and Scatter/Gather (SG) support from the RDMA device
will have to use mlockall() to lock all application memory to use RDMA. to work. For example, Mellanox ConnectX-3 and older adapters don't have
In case of the native Vitastor QEMU driver with RDMA, all virtual machine Implicit ODP, so they're unsupported by Vitastor. Run `ibv_devinfo -v` as
memory will be locked if your RDMA device doesn't support Implicit ODP. root to list available RDMA devices and their features.
Notably, Mellanox ConnectX-3 and older adapters don't support Implicit ODP,
while ConnectX-4 and newer do. Run `ibv_devinfo -v` as root to list
available RDMA devices and their features.
info_ru: | info_ru: |
Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0"). Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0").
Имейте в виду, что если ваше устройство не поддерживает Implicit ODP Имейте в виду, что поддержка RDMA в Vitastor требует функций устройства
(Implicit On-Demand Paging), то все OSD и клиенты Vitastor будут вынуждены Implicit On-Demand Paging (Implicit ODP) и Scatter/Gather (SG). Например,
блокировать всю память приложения с помощью mlockall(), чтобы задействовать адаптеры Mellanox ConnectX-3 и более старые не поддерживают Implicit ODP и
RDMA. В случае нативного QEMU-драйвера это будет означать, что при потому не поддерживаются в Vitastor. Запустите `ibv_devinfo -v` от имени
использовании RDMA на устройстве без поддержки Implicit ODP блокироваться суперпользователя, чтобы посмотреть список доступных RDMA-устройств, их
от выгрузки будет вся память виртуальных машин. параметры и возможности.
В случае с адаптерами Mellanox Implicit ODP поддерживается начиная с
ConnectX-4. ConnectX-3 и более старые адаптеры не поддерживают Implicit ODP.
Чтобы посмотреть список своих RDMA-устройств и их возможностей, запустите
`ibv_devinfo -v` от имени суперпользователя.
- name: rdma_port_num - name: rdma_port_num
type: int type: int
default: 1 default: 1

View File

@@ -1345,21 +1345,30 @@ class Mon
const tm = prev_stats ? BigInt(timestamp - prev_stats.timestamp) : 0; const tm = prev_stats ? BigInt(timestamp - prev_stats.timestamp) : 0;
for (const op in op_stats) for (const op in op_stats)
{ {
op_stats[op].bps = prev_stats ? (op_stats[op].bytes - prev_stats.op_stats[op].bytes) * 1000n / tm : 0; if (prev_stats && prev_stats.op_stats && prev_stats.op_stats[op])
op_stats[op].iops = prev_stats ? (op_stats[op].count - prev_stats.op_stats[op].count) * 1000n / tm : 0; {
op_stats[op].lat = prev_stats ? (op_stats[op].usec - prev_stats.op_stats[op].usec) op_stats[op].bps = (op_stats[op].bytes - prev_stats.op_stats[op].bytes) * 1000n / tm;
/ ((op_stats[op].count - prev_stats.op_stats[op].count) || 1n) : 0; op_stats[op].iops = (op_stats[op].count - prev_stats.op_stats[op].count) * 1000n / tm;
op_stats[op].lat = (op_stats[op].usec - prev_stats.op_stats[op].usec)
/ ((op_stats[op].count - prev_stats.op_stats[op].count) || 1n);
}
} }
for (const op in subop_stats) for (const op in subop_stats)
{ {
subop_stats[op].iops = prev_stats ? (subop_stats[op].count - prev_stats.subop_stats[op].count) * 1000n / tm : 0; if (prev_stats && prev_stats.subop_stats && prev_stats.subop_stats[op])
subop_stats[op].lat = prev_stats ? (subop_stats[op].usec - prev_stats.subop_stats[op].usec) {
/ ((subop_stats[op].count - prev_stats.subop_stats[op].count) || 1n) : 0; subop_stats[op].iops = (subop_stats[op].count - prev_stats.subop_stats[op].count) * 1000n / tm;
subop_stats[op].lat = (subop_stats[op].usec - prev_stats.subop_stats[op].usec)
/ ((subop_stats[op].count - prev_stats.subop_stats[op].count) || 1n);
}
} }
for (const op in recovery_stats) for (const op in recovery_stats)
{ {
recovery_stats[op].bps = prev_stats ? (recovery_stats[op].bytes - prev_stats.recovery_stats[op].bytes) * 1000n / tm : 0; if (prev_stats && prev_stats.recovery_stats && prev_stats.recovery_stats[op])
recovery_stats[op].iops = prev_stats ? (recovery_stats[op].count - prev_stats.recovery_stats[op].count) * 1000n / tm : 0; {
recovery_stats[op].bps = (recovery_stats[op].bytes - prev_stats.recovery_stats[op].bytes) * 1000n / tm;
recovery_stats[op].iops = (recovery_stats[op].count - prev_stats.recovery_stats[op].count) * 1000n / tm;
}
} }
return { op_stats, subop_stats, recovery_stats }; return { op_stats, subop_stats, recovery_stats };
} }

View File

@@ -50,7 +50,7 @@ from cinder.volume import configuration
from cinder.volume import driver from cinder.volume import driver
from cinder.volume import volume_utils from cinder.volume import volume_utils
VERSION = '0.6.12' VERSION = '0.6.14'
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)

View File

@@ -25,4 +25,4 @@ rm fio
mv fio-copy fio mv fio-copy fio
FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'` FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
tar --transform 's#^#vitastor-0.6.12/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.12$(rpm --eval '%dist').tar.gz * tar --transform 's#^#vitastor-0.6.14/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.14$(rpm --eval '%dist').tar.gz *

View File

@@ -34,7 +34,7 @@ ADD . /root/vitastor
RUN set -e; \ RUN set -e; \
cd /root/vitastor/rpm; \ cd /root/vitastor/rpm; \
sh build-tarball.sh; \ sh build-tarball.sh; \
cp /root/vitastor-0.6.12.el7.tar.gz ~/rpmbuild/SOURCES; \ cp /root/vitastor-0.6.14.el7.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \ cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \ cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \ rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor Name: vitastor
Version: 0.6.12 Version: 0.6.14
Release: 1%{?dist} Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1 License: Vitastor Network Public License 1.1
URL: https://vitastor.io/ URL: https://vitastor.io/
Source0: vitastor-0.6.12.el7.tar.gz Source0: vitastor-0.6.14.el7.tar.gz
BuildRequires: liburing-devel >= 0.6 BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel BuildRequires: gperftools-devel

View File

@@ -33,7 +33,7 @@ ADD . /root/vitastor
RUN set -e; \ RUN set -e; \
cd /root/vitastor/rpm; \ cd /root/vitastor/rpm; \
sh build-tarball.sh; \ sh build-tarball.sh; \
cp /root/vitastor-0.6.12.el8.tar.gz ~/rpmbuild/SOURCES; \ cp /root/vitastor-0.6.14.el8.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \ cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \ cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \ rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor Name: vitastor
Version: 0.6.12 Version: 0.6.14
Release: 1%{?dist} Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1 License: Vitastor Network Public License 1.1
URL: https://vitastor.io/ URL: https://vitastor.io/
Source0: vitastor-0.6.12.el8.tar.gz Source0: vitastor-0.6.14.el8.tar.gz
BuildRequires: liburing-devel >= 0.6 BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel BuildRequires: gperftools-devel

View File

@@ -15,7 +15,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}") set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}")
endif() endif()
add_definitions(-DVERSION="0.6.12") add_definitions(-DVERSION="0.6.14")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -I ${CMAKE_SOURCE_DIR}/src) add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -I ${CMAKE_SOURCE_DIR}/src)
if (${WITH_ASAN}) if (${WITH_ASAN})
add_definitions(-fsanitize=address -fno-omit-frame-pointer) add_definitions(-fsanitize=address -fno-omit-frame-pointer)

View File

@@ -1,3 +1,5 @@
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <net/if.h> #include <net/if.h>
#include <sys/types.h> #include <sys/types.h>
@@ -9,7 +11,7 @@
#include "addr_util.h" #include "addr_util.h"
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr) bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr_storage *addr)
{ {
if (parse_port) if (parse_port)
{ {
@@ -25,7 +27,7 @@ bool string_to_addr(std::string str, bool parse_port, int default_port, struct s
} }
if (inet_pton(AF_INET, str.c_str(), &((struct sockaddr_in*)addr)->sin_addr) == 1) if (inet_pton(AF_INET, str.c_str(), &((struct sockaddr_in*)addr)->sin_addr) == 1)
{ {
addr->sa_family = AF_INET; addr->ss_family = AF_INET;
((struct sockaddr_in*)addr)->sin_port = htons(default_port); ((struct sockaddr_in*)addr)->sin_port = htons(default_port);
return true; return true;
} }
@@ -33,30 +35,30 @@ bool string_to_addr(std::string str, bool parse_port, int default_port, struct s
str = str.substr(1, str.length()-2); str = str.substr(1, str.length()-2);
if (inet_pton(AF_INET6, str.c_str(), &((struct sockaddr_in6*)addr)->sin6_addr) == 1) if (inet_pton(AF_INET6, str.c_str(), &((struct sockaddr_in6*)addr)->sin6_addr) == 1)
{ {
addr->sa_family = AF_INET6; addr->ss_family = AF_INET6;
((struct sockaddr_in6*)addr)->sin6_port = htons(default_port); ((struct sockaddr_in6*)addr)->sin6_port = htons(default_port);
return true; return true;
} }
return false; return false;
} }
std::string addr_to_string(const sockaddr &addr) std::string addr_to_string(const sockaddr_storage &addr)
{ {
char peer_str[256]; char peer_str[256];
bool ok = false; bool ok = false;
int port; int port;
if (addr.sa_family == AF_INET) if (addr.ss_family == AF_INET)
{ {
ok = !!inet_ntop(AF_INET, &((sockaddr_in*)&addr)->sin_addr, peer_str, 256); ok = !!inet_ntop(AF_INET, &((sockaddr_in*)&addr)->sin_addr, peer_str, 256);
port = ntohs(((sockaddr_in*)&addr)->sin_port); port = ntohs(((sockaddr_in*)&addr)->sin_port);
} }
else if (addr.sa_family == AF_INET6) else if (addr.ss_family == AF_INET6)
{ {
ok = !!inet_ntop(AF_INET6, &((sockaddr_in6*)&addr)->sin6_addr, peer_str, 256); ok = !!inet_ntop(AF_INET6, &((sockaddr_in6*)&addr)->sin6_addr, peer_str, 256);
port = ntohs(((sockaddr_in6*)&addr)->sin6_port); port = ntohs(((sockaddr_in6*)&addr)->sin6_port);
} }
else else
throw std::runtime_error("Unknown address family "+std::to_string(addr.sa_family)); throw std::runtime_error("Unknown address family "+std::to_string(addr.ss_family));
if (!ok) if (!ok)
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
return std::string(peer_str)+":"+std::to_string(port); return std::string(peer_str)+":"+std::to_string(port);
@@ -186,3 +188,51 @@ std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg, bool
freeifaddrs(list); freeifaddrs(list);
return addresses; return addresses;
} }
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port)
{
sockaddr_storage addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listening_port)
{
if (bind_port == 0)
{
socklen_t len = sizeof(addr);
if (getsockname(listen_fd, (sockaddr *)&addr, &len) == -1)
{
close(listen_fd);
throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
}
*listening_port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else
{
*listening_port = bind_port;
}
}
if (listen(listen_fd, listen_backlog ? listen_backlog : 128) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
return listen_fd;
}

View File

@@ -4,6 +4,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr); bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr_storage *addr);
std::string addr_to_string(const sockaddr &addr); std::string addr_to_string(const sockaddr_storage &addr);
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg = std::vector<std::string>(), bool include_v6 = false); std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg = std::vector<std::string>(), bool include_v6 = false);
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);

View File

@@ -21,7 +21,7 @@
// Memory alignment for direct I/O (usually 512 bytes) // Memory alignment for direct I/O (usually 512 bytes)
// All other alignments must be a multiple of this one // All other alignments must be a multiple of this one
#ifndef MEM_ALIGNMENT #ifndef MEM_ALIGNMENT
#define MEM_ALIGNMENT 512 #define MEM_ALIGNMENT 4096
#endif #endif
// Default block size is 128 KB, current allowed range is 4K - 128M // Default block size is 128 KB, current allowed range is 4K - 128M

View File

@@ -154,7 +154,7 @@ resume_1:
if (pool_it != parent->cli->st_cli.pool_config.end()) if (pool_it != parent->cli->st_cli.pool_config.end())
{ {
auto & pool_cfg = pool_it->second; auto & pool_cfg = pool_it->second;
used_size = used_size / pool_pg_real_size[pool_id] used_size = used_size / (pool_pg_real_size[pool_id] ? pool_pg_real_size[pool_id] : 1)
* (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
} }
auto stat_it = stats.find(inode_num); auto stat_it = stats.find(inode_num);

View File

@@ -96,7 +96,8 @@ struct rm_inode_t
{ {
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->peer_fd = parent->cli->msgr.osd_peer_fds[cur_list->rm_osd_num]; // Already checked that it exists above, but anyway
op->peer_fd = parent->cli->msgr.osd_peer_fds.at(cur_list->rm_osd_num);
op->req = (osd_any_op_t){ op->req = (osd_any_op_t){
.rw = { .rw = {
.header = { .header = {

View File

@@ -143,7 +143,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op)
} }
else /* if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP) */ else /* if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP) */
{ {
for (auto prev = op->prev; prev; prev = prev->prev) for (auto prev = op_queue_head; prev && prev != op; prev = prev->next)
{ {
if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER) if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER)
{ {
@@ -151,7 +151,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op)
} }
else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ || prev->opcode == OSD_OP_READ_BITMAP) else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ || prev->opcode == OSD_OP_READ_BITMAP)
{ {
// Flushes are always in the beginning // Flushes are always in the beginning (we're scanning from the beginning of the queue)
break; break;
} }
} }
@@ -172,6 +172,7 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n
(next->opcode == OSD_OP_READ || next->opcode == OSD_OP_READ_BITMAP) && (flags & OP_FLUSH_BUFFER)) (next->opcode == OSD_OP_READ || next->opcode == OSD_OP_READ_BITMAP) && (flags & OP_FLUSH_BUFFER))
{ {
next->prev_wait += inc; next->prev_wait += inc;
assert(next->prev_wait >= 0);
if (!next->prev_wait) if (!next->prev_wait)
{ {
if (next->opcode == OSD_OP_SYNC) if (next->opcode == OSD_OP_SYNC)
@@ -191,6 +192,7 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n
if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE) if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE)
{ {
next->prev_wait += inc; next->prev_wait += inc;
assert(next->prev_wait >= 0);
if (!next->prev_wait) if (!next->prev_wait)
{ {
if (next->opcode == OSD_OP_SYNC) if (next->opcode == OSD_OP_SYNC)

View File

@@ -200,7 +200,8 @@ void cluster_client_t::send_list(inode_list_osd_t *cur_list)
auto & pool_cfg = st_cli.pool_config[cur_list->pg->lst->pool_id]; auto & pool_cfg = st_cli.pool_config[cur_list->pg->lst->pool_id];
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->peer_fd = msgr.osd_peer_fds[cur_list->osd_num]; // Already checked that it exists above, but anyway
op->peer_fd = msgr.osd_peer_fds.at(cur_list->osd_num);
op->req = (osd_any_op_t){ op->req = (osd_any_op_t){
.sec_list = { .sec_list = {
.header = { .header = {

View File

@@ -351,9 +351,9 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
} }
else else
{ {
printf("+++ %s 0x%lx 0x%llx+%llx\n", printf("+++ %s 0x%lx 0x%llx+%lx\n",
io->ddir == DDIR_READ ? "READ" : "WRITE", io->ddir == DDIR_READ ? "READ" : "WRITE",
(uint64_t)io, io->offset, io->xfer_buflen); (uint64_t)io, io->offset, (uint64_t)io->xfer_buflen);
} }
} }

View File

@@ -170,14 +170,14 @@ static int sec_init(struct thread_data *td)
bsd->block_order = o->block_order == 0 ? 17 : o->block_order; bsd->block_order = o->block_order == 0 ? 17 : o->block_order;
bsd->block_size = 1 << o->block_order; bsd->block_size = 1 << o->block_order;
sockaddr addr; sockaddr_storage addr;
if (!string_to_addr(std::string(o->host ? o->host : "127.0.0.1"), false, o->port > 0 ? o->port : 11203, &addr)) if (!string_to_addr(std::string(o->host ? o->host : "127.0.0.1"), false, o->port > 0 ? o->port : 11203, &addr))
{ {
fprintf(stderr, "server address: %s is not valid\n", o->host ? o->host : "127.0.0.1"); fprintf(stderr, "server address: %s is not valid\n", o->host ? o->host : "127.0.0.1");
return 1; return 1;
} }
bsd->connect_fd = socket(addr.sa_family, SOCK_STREAM, 0); bsd->connect_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (bsd->connect_fd < 0) if (bsd->connect_fd < 0)
{ {
perror("socket"); perror("socket");
@@ -355,7 +355,7 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
{ {
if (reply.hdr.retval != io->xfer_buflen) if (reply.hdr.retval != io->xfer_buflen)
{ {
fprintf(stderr, "Short read: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); fprintf(stderr, "Short read: retval = %ld instead of %lu\n", reply.hdr.retval, (uint64_t)io->xfer_buflen);
exit(1); exit(1);
} }
// Support bitmap // Support bitmap
@@ -380,7 +380,7 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
{ {
if (reply.hdr.retval != io->xfer_buflen) if (reply.hdr.retval != io->xfer_buflen)
{ {
fprintf(stderr, "Short write: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); fprintf(stderr, "Short write: retval = %ld instead of %lu\n", reply.hdr.retval, (uint64_t)io->xfer_buflen);
exit(1); exit(1);
} }
} }

View File

@@ -271,7 +271,7 @@ void http_co_t::close_connection()
void http_co_t::start_connection() void http_co_t::start_connection()
{ {
stackin(); stackin();
struct sockaddr addr; struct sockaddr_storage addr;
if (!string_to_addr(host.c_str(), 1, 80, &addr)) if (!string_to_addr(host.c_str(), 1, 80, &addr))
{ {
parsed = { .error = "Invalid address: "+host }; parsed = { .error = "Invalid address: "+host };
@@ -279,7 +279,7 @@ void http_co_t::start_connection()
stackout(); stackout();
return; return;
} }
peer_fd = socket(addr.sa_family, SOCK_STREAM, 0); peer_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (peer_fd < 0) if (peer_fd < 0)
{ {
parsed = { .error = std::string("socket: ")+strerror(errno) }; parsed = { .error = std::string("socket: ")+strerror(errno) };

View File

@@ -222,13 +222,13 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port) void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
{ {
assert(peer_osd != this->osd_num); assert(peer_osd != this->osd_num);
struct sockaddr addr; struct sockaddr_storage addr;
if (!string_to_addr(peer_host, 0, peer_port, &addr)) if (!string_to_addr(peer_host, 0, peer_port, &addr))
{ {
on_connect_peer(peer_osd, -EINVAL); on_connect_peer(peer_osd, -EINVAL);
return; return;
} }
int peer_fd = socket(addr.sa_family, SOCK_STREAM, 0); int peer_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (peer_fd < 0) if (peer_fd < 0)
{ {
on_connect_peer(peer_osd, -errno); on_connect_peer(peer_osd, -errno);
@@ -484,10 +484,10 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
void osd_messenger_t::accept_connections(int listen_fd) void osd_messenger_t::accept_connections(int listen_fd)
{ {
// Accept new connections // Accept new connections
sockaddr addr; sockaddr_storage addr;
socklen_t peer_addr_size = sizeof(addr); socklen_t peer_addr_size = sizeof(addr);
int peer_fd; int peer_fd;
while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0) while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
{ {
assert(peer_fd != 0); assert(peer_fd != 0);
fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd, fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd,

View File

@@ -49,7 +49,7 @@ struct osd_client_t
{ {
int refs = 0; int refs = 0;
sockaddr peer_addr; sockaddr_storage peer_addr;
int peer_port; int peer_port;
int peer_fd; int peer_fd;
int peer_state; int peer_state;

View File

@@ -3,7 +3,6 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <sys/mman.h>
#include "msgr_rdma.h" #include "msgr_rdma.h"
#include "messenger.h" #include "messenger.h"
@@ -55,7 +54,6 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level) msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
{ {
int res; int res;
bool odp = true;
ibv_device **dev_list = NULL; ibv_device **dev_list = NULL;
msgr_rdma_context_t *ctx = new msgr_rdma_context_t(); msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
ctx->mtu = mtu; ctx->mtu = mtu;
@@ -119,9 +117,9 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev)); fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev));
goto cleanup; goto cleanup;
} }
if ((res = ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid)) != 0) if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
{ {
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(ctx->dev), gid_index, strerror(res)); fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index);
goto cleanup; goto cleanup;
} }
@@ -133,9 +131,9 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
} }
{ {
if ((res = ibv_query_device_ex(ctx->context, NULL, &ctx->attrx)) != 0) if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
{ {
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(ctx->dev), strerror(res)); fprintf(stderr, "Couldn't query RDMA device for its features\n");
goto cleanup; goto cleanup;
} }
if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) || if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
@@ -143,20 +141,15 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)) !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{ {
fprintf(stderr, "Warning: RDMA device isn't implicit ODP (On-Demand Paging) capable, trying to lock all application memory\n"); fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n");
if (mlockall(MCL_CURRENT|MCL_FUTURE|MCL_ONFAULT) != 0) goto cleanup;
{
fprintf(stderr, "mlockall() failed: %s\n", strerror(errno));
goto cleanup;
}
odp = false;
} }
} }
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | (odp ? IBV_ACCESS_ON_DEMAND : 0)); ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
if (!ctx->mr) if (!ctx->mr)
{ {
fprintf(stderr, "Couldn't register RDMA memory region: %s\n", strerror(errno)); fprintf(stderr, "Couldn't register RDMA memory region\n");
goto cleanup; goto cleanup;
} }

View File

@@ -363,7 +363,8 @@ public:
setsid(); setsid();
if (fork()) if (fork())
exit(0); exit(0);
chdir("/"); if (chdir("/") != 0)
fprintf(stderr, "Warning: Failed to chdir into /\n");
close(0); close(0);
close(1); close(1);
close(2); close(2);
@@ -525,7 +526,11 @@ protected:
{ {
goto end_unmap; goto end_unmap;
} }
write(qd_fd, "32768", 5); r = write(qd_fd, "32768", 5);
if (r != 5)
{
fprintf(stderr, "Warning: Failed to configure max_sectors_kb\n");
}
close(qd_fd); close(qd_fd);
if (!fork()) if (!fork())
{ {

View File

@@ -57,7 +57,11 @@ osd_t::osd_t(const json11::Json & config, ring_loop_t *ringloop)
if (this->config["osd_memlock"] == "true" || this->config["osd_memlock"] == "1" || this->config["osd_memlock"] == "yes") if (this->config["osd_memlock"] == "true" || this->config["osd_memlock"] == "1" || this->config["osd_memlock"] == "yes")
{ {
// Lock all OSD memory if requested // Lock all OSD memory if requested
if (mlockall(MCL_CURRENT|MCL_FUTURE|MCL_ONFAULT) != 0) if (mlockall(MCL_CURRENT|MCL_FUTURE
#ifdef MCL_ONFAULT
| MCL_ONFAULT
#endif
) != 0)
{ {
fprintf(stderr, "osd_memlock is set to true, but mlockall() failed: %s\n", strerror(errno)); fprintf(stderr, "osd_memlock is set to true, but mlockall() failed: %s\n", strerror(errno));
exit(-1); exit(-1);
@@ -196,46 +200,7 @@ void osd_t::bind_socket()
// FIXME Support multiple listening sockets // FIXME Support multiple listening sockets
sockaddr addr; listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (bind_port == 0)
{
socklen_t len = sizeof(addr);
if (getsockname(listen_fd, (sockaddr *)&addr, &len) == -1)
{
close(listen_fd);
throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
}
listening_port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else
{
listening_port = bind_port;
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events) epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)

View File

@@ -211,7 +211,7 @@ class osd_t
// flushing, recovery and backfill // flushing, recovery and backfill
void submit_pg_flush_ops(pg_t & pg); void submit_pg_flush_ops(pg_t & pg);
void handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval); void handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval);
void submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data); bool submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data);
bool pick_next_recovery(osd_recovery_op_t &op); bool pick_next_recovery(osd_recovery_op_t &op);
void submit_recovery_op(osd_recovery_op_t *op); void submit_recovery_op(osd_recovery_op_t *op);
bool continue_recovery(); bool continue_recovery();

View File

@@ -47,7 +47,8 @@ void osd_t::submit_pg_flush_ops(pg_t & pg)
if (l.second.size() > 0) if (l.second.size() > 0)
{ {
fb->flush_ops++; fb->flush_ops++;
submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()); if (!submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()))
return;
} }
} }
for (auto & l: fb->stable_lists) for (auto & l: fb->stable_lists)
@@ -55,7 +56,8 @@ void osd_t::submit_pg_flush_ops(pg_t & pg)
if (l.second.size() > 0) if (l.second.size() > 0)
{ {
fb->flush_ops++; fb->flush_ops++;
submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()); if (!submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()))
return;
} }
} }
} }
@@ -160,7 +162,7 @@ void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, p
} }
} }
void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data) bool osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data)
{ {
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
// Copy buffer so it gets freed along with the operation // Copy buffer so it gets freed along with the operation
@@ -188,10 +190,8 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
else else
{ {
// Peer // Peer
int peer_fd = msgr.osd_peer_fds[peer_osd];
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->iov.push_back(op->buf, count * sizeof(obj_ver_id)); op->iov.push_back(op->buf, count * sizeof(obj_ver_id));
op->peer_fd = peer_fd;
op->req = (osd_any_op_t){ op->req = (osd_any_op_t){
.sec_stab = { .sec_stab = {
.header = { .header = {
@@ -207,8 +207,21 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval); handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval);
delete op; delete op;
}; };
msgr.outbox_push(op); auto peer_fd_it = msgr.osd_peer_fds.find(peer_osd);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
op->peer_fd = peer_fd_it->second;
msgr.outbox_push(op);
}
else
{
// Fail it immediately
op->reply.hdr.retval = -EPIPE;
op->callback(op);
return false;
}
} }
return true;
} }
bool osd_t::pick_next_recovery(osd_recovery_op_t &op) bool osd_t::pick_next_recovery(osd_recovery_op_t &op)

View File

@@ -29,7 +29,7 @@ void osd_t::handle_peers()
degraded_objects += p.second.degraded_objects.size(); degraded_objects += p.second.degraded_objects.size();
if (p.second.state & PG_HAS_UNCLEAN) if (p.second.state & PG_HAS_UNCLEAN)
peering_state = peering_state | OSD_FLUSHING_PGS; peering_state = peering_state | OSD_FLUSHING_PGS;
else if (p.second.state & PG_HAS_DEGRADED) else if (p.second.state & (PG_HAS_DEGRADED | PG_HAS_MISPLACED))
peering_state = peering_state | OSD_RECOVERING; peering_state = peering_state | OSD_RECOVERING;
} }
else else
@@ -340,7 +340,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
else else
{ {
// Peer // Peer
auto & cl = msgr.clients.at(msgr.osd_peer_fds[role_osd]); auto & cl = msgr.clients.at(msgr.osd_peer_fds.at(role_osd));
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->peer_fd = cl->peer_fd; op->peer_fd = cl->peer_fd;
@@ -419,7 +419,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
// Peer // Peer
osd_op_t *op = new osd_op_t(); osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT; op->op_type = OSD_OP_OUT;
op->peer_fd = msgr.osd_peer_fds[role_osd]; op->peer_fd = msgr.osd_peer_fds.at(role_osd);
op->req = (osd_any_op_t){ op->req = (osd_any_op_t){
.sec_list = { .sec_list = {
.header = { .header = {

View File

@@ -246,7 +246,6 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
// Send to a remote OSD // Send to a remote OSD
osd_op_t *subop = op_data->subops+subop_idx; osd_op_t *subop = op_data->subops+subop_idx;
subop->op_type = OSD_OP_OUT; subop->op_type = OSD_OP_OUT;
subop->peer_fd = msgr.osd_peer_fds.at(subop_osd_num);
// FIXME: Use the pre-allocated buffer // FIXME: Use the pre-allocated buffer
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev)); subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
subop->req = (osd_any_op_t){ subop->req = (osd_any_op_t){
@@ -287,7 +286,18 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
} }
handle_primary_subop(subop, cur_op); handle_primary_subop(subop, cur_op);
}; };
msgr.outbox_push(subop); auto peer_fd_it = msgr.osd_peer_fds.find(subop_osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subop->peer_fd = peer_fd_it->second;
msgr.outbox_push(subop);
}
else
{
// Fail it immediately
subop->reply.hdr.retval = -EPIPE;
subop->callback(subop);
}
subop_idx++; subop_idx++;
} }
prev = i+1; prev = i+1;

View File

@@ -182,7 +182,6 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
else else
{ {
subop->op_type = OSD_OP_OUT; subop->op_type = OSD_OP_OUT;
subop->peer_fd = msgr.osd_peer_fds.at(role_osd_num);
subop->bitmap = stripes[stripe_num].bmp_buf; subop->bitmap = stripes[stripe_num].bmp_buf;
subop->bitmap_len = clean_entry_bitmap_size; subop->bitmap_len = clean_entry_bitmap_size;
subop->req.sec_rw = { subop->req.sec_rw = {
@@ -225,7 +224,18 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
{ {
handle_primary_subop(subop, cur_op); handle_primary_subop(subop, cur_op);
}; };
msgr.outbox_push(subop); auto peer_fd_it = msgr.osd_peer_fds.find(role_osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subop->peer_fd = peer_fd_it->second;
msgr.outbox_push(subop);
}
else
{
// Fail it immediately
subop->reply.hdr.retval = -EPIPE;
subop->callback(subop);
}
} }
i++; i++;
} }
@@ -463,7 +473,6 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
else else
{ {
subops[i].op_type = OSD_OP_OUT; subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = msgr.osd_peer_fds.at(chunk.osd_num);
subops[i].req = (osd_any_op_t){ .sec_del = { subops[i].req = (osd_any_op_t){ .sec_del = {
.header = { .header = {
.magic = SECONDARY_OSD_OP_MAGIC, .magic = SECONDARY_OSD_OP_MAGIC,
@@ -477,7 +486,18 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
{ {
handle_primary_subop(subop, cur_op); handle_primary_subop(subop, cur_op);
}; };
msgr.outbox_push(&subops[i]); auto peer_fd_it = msgr.osd_peer_fds.find(chunk.osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subops[i].peer_fd = peer_fd_it->second;
msgr.outbox_push(&subops[i]);
}
else
{
// Fail it immediately
subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]);
}
} }
} }
} }
@@ -567,7 +587,6 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
else else
{ {
subops[i].op_type = OSD_OP_OUT; subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = msgr.osd_peer_fds.at(stab_osd.osd_num);
subops[i].req = (osd_any_op_t){ .sec_stab = { subops[i].req = (osd_any_op_t){ .sec_stab = {
.header = { .header = {
.magic = SECONDARY_OSD_OP_MAGIC, .magic = SECONDARY_OSD_OP_MAGIC,
@@ -581,7 +600,18 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
{ {
handle_primary_subop(subop, cur_op); handle_primary_subop(subop, cur_op);
}; };
msgr.outbox_push(&subops[i]); auto peer_fd_it = msgr.osd_peer_fds.find(stab_osd.osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subops[i].peer_fd = peer_fd_it->second;
msgr.outbox_push(&subops[i]);
}
else
{
// Fail it immediately
subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]);
}
} }
} }
} }

View File

@@ -8,7 +8,7 @@
#include "osd_id.h" #include "osd_id.h"
#ifndef MEM_ALIGNMENT #ifndef MEM_ALIGNMENT
#define MEM_ALIGNMENT 512 #define MEM_ALIGNMENT 4096
#endif #endif
struct buf_len_t struct buf_len_t

View File

@@ -134,14 +134,14 @@ int main(int narg, char *args[])
int connect_osd(const char *osd_address, int osd_port) int connect_osd(const char *osd_address, int osd_port)
{ {
struct sockaddr addr; struct sockaddr_storage addr;
if (!string_to_addr(osd_address, 0, osd_port, &addr)) if (!string_to_addr(osd_address, 0, osd_port, &addr))
{ {
fprintf(stderr, "server address: %s is not valid\n", osd_address); fprintf(stderr, "server address: %s is not valid\n", osd_address);
return -1; return -1;
} }
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0); int connect_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (connect_fd < 0) if (connect_fd < 0)
{ {
perror("socket"); perror("socket");

View File

@@ -67,14 +67,14 @@ int main(int narg, char *args[])
int connect_stub(const char *server_address, int server_port) int connect_stub(const char *server_address, int server_port)
{ {
struct sockaddr addr; struct sockaddr_storage addr;
if (!string_to_addr(server_address, 0, server_port, &addr)) if (!string_to_addr(server_address, 0, server_port, &addr))
{ {
fprintf(stderr, "server address: %s is not valid\n", server_address); fprintf(stderr, "server address: %s is not valid\n", server_address);
return -1; return -1;
} }
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0); int connect_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (connect_fd < 0) if (connect_fd < 0)
{ {
perror("socket"); perror("socket");

View File

@@ -41,21 +41,19 @@
#include "rw_blocking.h" #include "rw_blocking.h"
#include "osd_ops.h" #include "osd_ops.h"
int bind_stub(std::string bind_address, int bind_port);
void run_stub(int peer_fd); void run_stub(int peer_fd);
int main(int narg, char *args[]) int main(int narg, char *args[])
{ {
int listen_fd = bind_stub("0.0.0.0", 11203); int listen_fd = create_and_bind_socket("0.0.0.0", 11203, 128, NULL);
// Accept new connections // Accept new connections
sockaddr addr; sockaddr_storage addr;
socklen_t peer_addr_size = sizeof(addr); socklen_t peer_addr_size = sizeof(addr);
int peer_fd; int peer_fd;
while (1) while (1)
{ {
printf("stub_osd: waiting for 1 client\n"); printf("stub_osd: waiting for 1 client\n");
peer_fd = accept(listen_fd, &addr, &peer_addr_size); peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size);
if (peer_fd == -1) if (peer_fd == -1)
{ {
if (errno == EAGAIN) if (errno == EAGAIN)
@@ -76,39 +74,6 @@ int main(int narg, char *args[])
return 0; return 0;
} }
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
return listen_fd;
}
void run_stub(int peer_fd) void run_stub(int peer_fd)
{ {
osd_any_op_t op; osd_any_op_t op;

View File

@@ -25,8 +25,6 @@
#include "epoll_manager.h" #include "epoll_manager.h"
#include "messenger.h" #include "messenger.h"
int bind_stub(std::string bind_address, int bind_port);
void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op); void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op);
int main(int narg, char *args[]) int main(int narg, char *args[])
@@ -43,7 +41,8 @@ int main(int narg, char *args[])
json11::Json config = json11::Json::object { { "log_level", 1 } }; json11::Json config = json11::Json::object { { "log_level", 1 } };
msgr->parse_config(config); msgr->parse_config(config);
// Accept new connections // Accept new connections
int listen_fd = bind_stub("0.0.0.0", 11203); int listen_fd = create_and_bind_socket("0.0.0.0", 11203, 128, NULL);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [listen_fd, msgr](int fd, int events) epmgr->set_fd_handler(listen_fd, false, [listen_fd, msgr](int fd, int events)
{ {
msgr->accept_connections(listen_fd); msgr->accept_connections(listen_fd);
@@ -67,41 +66,6 @@ int main(int narg, char *args[])
return 0; return 0;
} }
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
return listen_fd;
}
void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op) void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op)
{ {
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;

View File

@@ -6,7 +6,7 @@ includedir=${prefix}/@CMAKE_INSTALL_INCLUDEDIR@
Name: Vitastor Name: Vitastor
Description: Vitastor client library Description: Vitastor client library
Version: 0.6.12 Version: 0.6.14
Libs: -L${libdir} -lvitastor_client Libs: -L${libdir} -lvitastor_client
Cflags: -I${includedir} Cflags: -I${includedir}