Compare commits

..

1 Commits

Author SHA1 Message Date
043ed854f3 Support RDMA devices without Implicit ODP using mlockall()
UPD: Seems it won't work because ibv_reg_mr() takes a permissions argument
and doesn't allow more permissions than allowed by the kernel for memory
mappings. So the only way to register all memory is probably to iterate
over /proc/PID/maps... :)

Mellanox docs mention that older MLNX_OFED emulated ODP, so maybe it's
still possible to use it, but it's not confirmed.
2022-02-02 01:40:29 +03:00
53 changed files with 1068 additions and 1227 deletions

View File

@@ -2,6 +2,6 @@ cmake_minimum_required(VERSION 2.8)
project(vitastor)
set(VERSION "0.6.15")
set(VERSION "0.6.12")
add_subdirectory(src)

View File

@@ -1,4 +1,4 @@
VERSION ?= v0.6.15
VERSION ?= v0.6.12
all: build push

View File

@@ -49,7 +49,7 @@ spec:
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: vitalif/vitastor-csi:v0.6.15
image: vitalif/vitastor-csi:v0.6.12
args:
- "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -116,7 +116,7 @@ spec:
privileged: true
capabilities:
add: ["SYS_ADMIN"]
image: vitalif/vitastor-csi:v0.6.15
image: vitalif/vitastor-csi:v0.6.12
args:
- "--node=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"

View File

@@ -5,7 +5,7 @@ package vitastor
const (
vitastorCSIDriverName = "csi.vitastor.io"
vitastorCSIDriverVersion = "0.6.15"
vitastorCSIDriverVersion = "0.6.12"
)
// Config struct fills the parameters of request or user input

2
debian/changelog vendored
View File

@@ -1,4 +1,4 @@
vitastor (0.6.15-1) unstable; urgency=medium
vitastor (0.6.12-1) unstable; urgency=medium
* RDMA support
* Bugfixes

View File

@@ -33,8 +33,8 @@ RUN set -e -x; \
mkdir -p /root/packages/vitastor-$REL; \
rm -rf /root/packages/vitastor-$REL/*; \
cd /root/packages/vitastor-$REL; \
cp -r /root/vitastor vitastor-0.6.15; \
cd vitastor-0.6.15; \
cp -r /root/vitastor vitastor-0.6.12; \
cd vitastor-0.6.12; \
ln -s /root/fio-build/fio-*/ ./fio; \
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
ls /usr/include/linux/raw.h || cp ./debian/raw.h /usr/include/linux/raw.h; \
@@ -47,8 +47,8 @@ RUN set -e -x; \
rm -rf a b; \
echo "dep:fio=$FIO" > debian/fio_version; \
cd /root/packages/vitastor-$REL; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.15.orig.tar.xz vitastor-0.6.15; \
cd vitastor-0.6.15; \
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.6.12.orig.tar.xz vitastor-0.6.12; \
cd vitastor-0.6.12; \
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \

View File

@@ -48,19 +48,28 @@
type: string
info: |
RDMA device name to use for Vitastor OSD communications (for example,
"rocep5s0f0"). Please note that Vitastor RDMA requires Implicit On-Demand
Paging (Implicit ODP) and Scatter/Gather (SG) support from the RDMA device
to work. For example, Mellanox ConnectX-3 and older adapters don't have
Implicit ODP, so they're unsupported by Vitastor. Run `ibv_devinfo -v` as
root to list available RDMA devices and their features.
"rocep5s0f0"). Please note that if your RDMA device doesn't support
Implicit ODP (Implicit On-Demand Paging) then all Vitastor OSDs and clients
will have to use mlockall() to lock all application memory to use RDMA.
In case of the native Vitastor QEMU driver with RDMA, all virtual machine
memory will be locked if your RDMA device doesn't support Implicit ODP.
Notably, Mellanox ConnectX-3 and older adapters don't support Implicit ODP,
while ConnectX-4 and newer do. Run `ibv_devinfo -v` as root to list
available RDMA devices and their features.
info_ru: |
Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0").
Имейте в виду, что поддержка RDMA в Vitastor требует функций устройства
Implicit On-Demand Paging (Implicit ODP) и Scatter/Gather (SG). Например,
адаптеры Mellanox ConnectX-3 и более старые не поддерживают Implicit ODP и
потому не поддерживаются в Vitastor. Запустите `ibv_devinfo -v` от имени
суперпользователя, чтобы посмотреть список доступных RDMA-устройств, их
параметры и возможности.
Имейте в виду, что если ваше устройство не поддерживает Implicit ODP
(Implicit On-Demand Paging), то все OSD и клиенты Vitastor будут вынуждены
блокировать всю память приложения с помощью mlockall(), чтобы задействовать
RDMA. В случае нативного QEMU-драйвера это будет означать, что при
использовании RDMA на устройстве без поддержки Implicit ODP блокироваться
от выгрузки будет вся память виртуальных машин.
В случае с адаптерами Mellanox Implicit ODP поддерживается начиная с
ConnectX-4. ConnectX-3 и более старые адаптеры не поддерживают Implicit ODP.
Чтобы посмотреть список своих RDMA-устройств и их возможностей, запустите
`ibv_devinfo -v` от имени суперпользователя.
- name: rdma_port_num
type: int
default: 1

View File

@@ -1345,30 +1345,21 @@ class Mon
const tm = prev_stats ? BigInt(timestamp - prev_stats.timestamp) : 0;
for (const op in op_stats)
{
if (prev_stats && prev_stats.op_stats && prev_stats.op_stats[op])
{
op_stats[op].bps = (op_stats[op].bytes - prev_stats.op_stats[op].bytes) * 1000n / tm;
op_stats[op].iops = (op_stats[op].count - prev_stats.op_stats[op].count) * 1000n / tm;
op_stats[op].lat = (op_stats[op].usec - prev_stats.op_stats[op].usec)
/ ((op_stats[op].count - prev_stats.op_stats[op].count) || 1n);
}
op_stats[op].bps = prev_stats ? (op_stats[op].bytes - prev_stats.op_stats[op].bytes) * 1000n / tm : 0;
op_stats[op].iops = prev_stats ? (op_stats[op].count - prev_stats.op_stats[op].count) * 1000n / tm : 0;
op_stats[op].lat = prev_stats ? (op_stats[op].usec - prev_stats.op_stats[op].usec)
/ ((op_stats[op].count - prev_stats.op_stats[op].count) || 1n) : 0;
}
for (const op in subop_stats)
{
if (prev_stats && prev_stats.subop_stats && prev_stats.subop_stats[op])
{
subop_stats[op].iops = (subop_stats[op].count - prev_stats.subop_stats[op].count) * 1000n / tm;
subop_stats[op].lat = (subop_stats[op].usec - prev_stats.subop_stats[op].usec)
/ ((subop_stats[op].count - prev_stats.subop_stats[op].count) || 1n);
}
subop_stats[op].iops = prev_stats ? (subop_stats[op].count - prev_stats.subop_stats[op].count) * 1000n / tm : 0;
subop_stats[op].lat = prev_stats ? (subop_stats[op].usec - prev_stats.subop_stats[op].usec)
/ ((subop_stats[op].count - prev_stats.subop_stats[op].count) || 1n) : 0;
}
for (const op in recovery_stats)
{
if (prev_stats && prev_stats.recovery_stats && prev_stats.recovery_stats[op])
{
recovery_stats[op].bps = (recovery_stats[op].bytes - prev_stats.recovery_stats[op].bytes) * 1000n / tm;
recovery_stats[op].iops = (recovery_stats[op].count - prev_stats.recovery_stats[op].count) * 1000n / tm;
}
recovery_stats[op].bps = prev_stats ? (recovery_stats[op].bytes - prev_stats.recovery_stats[op].bytes) * 1000n / tm : 0;
recovery_stats[op].iops = prev_stats ? (recovery_stats[op].count - prev_stats.recovery_stats[op].count) * 1000n / tm : 0;
}
return { op_stats, subop_stats, recovery_stats };
}

View File

@@ -50,7 +50,7 @@ from cinder.volume import configuration
from cinder.volume import driver
from cinder.volume import volume_utils
VERSION = '0.6.15'
VERSION = '0.6.12'
LOG = logging.getLogger(__name__)

View File

@@ -25,4 +25,4 @@ rm fio
mv fio-copy fio
FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
tar --transform 's#^#vitastor-0.6.15/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.15$(rpm --eval '%dist').tar.gz *
tar --transform 's#^#vitastor-0.6.12/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.6.12$(rpm --eval '%dist').tar.gz *

View File

@@ -34,7 +34,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.6.15.el7.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.6.12.el7.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.6.15
Version: 0.6.12
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.6.15.el7.tar.gz
Source0: vitastor-0.6.12.el7.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel

View File

@@ -33,7 +33,7 @@ ADD . /root/vitastor
RUN set -e; \
cd /root/vitastor/rpm; \
sh build-tarball.sh; \
cp /root/vitastor-0.6.15.el8.tar.gz ~/rpmbuild/SOURCES; \
cp /root/vitastor-0.6.12.el8.tar.gz ~/rpmbuild/SOURCES; \
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
cd ~/rpmbuild/SPECS/; \
rpmbuild -ba vitastor.spec; \

View File

@@ -1,11 +1,11 @@
Name: vitastor
Version: 0.6.15
Version: 0.6.12
Release: 1%{?dist}
Summary: Vitastor, a fast software-defined clustered block storage
License: Vitastor Network Public License 1.1
URL: https://vitastor.io/
Source0: vitastor-0.6.15.el8.tar.gz
Source0: vitastor-0.6.12.el8.tar.gz
BuildRequires: liburing-devel >= 0.6
BuildRequires: gperftools-devel

View File

@@ -15,7 +15,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}")
endif()
add_definitions(-DVERSION="0.6.15")
add_definitions(-DVERSION="0.6.12")
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -I ${CMAKE_SOURCE_DIR}/src)
if (${WITH_ASAN})
add_definitions(-fsanitize=address -fno-omit-frame-pointer)
@@ -155,7 +155,7 @@ target_link_libraries(vitastor-nbd
# vitastor-cli
add_executable(vitastor-cli
cli.cpp cli_alloc_osd.cpp cli_simple_offsets.cpp cli_df.cpp
cli_ls.cpp cli_create.cpp cli_modify.cpp cli_flatten.cpp cli_merge.cpp cli_rm_data.cpp cli_rm.cpp
cli_ls.cpp cli_create.cpp cli_modify.cpp cli_flatten.cpp cli_merge.cpp cli_rm.cpp cli_snap_rm.cpp
)
target_link_libraries(vitastor-cli
vitastor_client

View File

@@ -1,5 +1,3 @@
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <sys/types.h>
@@ -11,7 +9,7 @@
#include "addr_util.h"
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr_storage *addr)
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr)
{
if (parse_port)
{
@@ -27,7 +25,7 @@ bool string_to_addr(std::string str, bool parse_port, int default_port, struct s
}
if (inet_pton(AF_INET, str.c_str(), &((struct sockaddr_in*)addr)->sin_addr) == 1)
{
addr->ss_family = AF_INET;
addr->sa_family = AF_INET;
((struct sockaddr_in*)addr)->sin_port = htons(default_port);
return true;
}
@@ -35,30 +33,30 @@ bool string_to_addr(std::string str, bool parse_port, int default_port, struct s
str = str.substr(1, str.length()-2);
if (inet_pton(AF_INET6, str.c_str(), &((struct sockaddr_in6*)addr)->sin6_addr) == 1)
{
addr->ss_family = AF_INET6;
addr->sa_family = AF_INET6;
((struct sockaddr_in6*)addr)->sin6_port = htons(default_port);
return true;
}
return false;
}
std::string addr_to_string(const sockaddr_storage &addr)
std::string addr_to_string(const sockaddr &addr)
{
char peer_str[256];
bool ok = false;
int port;
if (addr.ss_family == AF_INET)
if (addr.sa_family == AF_INET)
{
ok = !!inet_ntop(AF_INET, &((sockaddr_in*)&addr)->sin_addr, peer_str, 256);
port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else if (addr.ss_family == AF_INET6)
else if (addr.sa_family == AF_INET6)
{
ok = !!inet_ntop(AF_INET6, &((sockaddr_in6*)&addr)->sin6_addr, peer_str, 256);
port = ntohs(((sockaddr_in6*)&addr)->sin6_port);
}
else
throw std::runtime_error("Unknown address family "+std::to_string(addr.ss_family));
throw std::runtime_error("Unknown address family "+std::to_string(addr.sa_family));
if (!ok)
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
return std::string(peer_str)+":"+std::to_string(port);
@@ -188,51 +186,3 @@ std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg, bool
freeifaddrs(list);
return addresses;
}
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port)
{
sockaddr_storage addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.ss_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listening_port)
{
if (bind_port == 0)
{
socklen_t len = sizeof(addr);
if (getsockname(listen_fd, (sockaddr *)&addr, &len) == -1)
{
close(listen_fd);
throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
}
*listening_port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else
{
*listening_port = bind_port;
}
}
if (listen(listen_fd, listen_backlog ? listen_backlog : 128) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
return listen_fd;
}

View File

@@ -4,7 +4,6 @@
#include <string>
#include <vector>
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr_storage *addr);
std::string addr_to_string(const sockaddr_storage &addr);
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr);
std::string addr_to_string(const sockaddr &addr);
std::vector<std::string> getifaddr_list(std::vector<std::string> mask_cfg = std::vector<std::string>(), bool include_v6 = false);
int create_and_bind_socket(std::string bind_address, int bind_port, int listen_backlog, int *listening_port);

View File

@@ -21,7 +21,7 @@
// Memory alignment for direct I/O (usually 512 bytes)
// All other alignments must be a multiple of this one
#ifndef MEM_ALIGNMENT
#define MEM_ALIGNMENT 4096
#define MEM_ALIGNMENT 512
#endif
// Default block size is 128 KB, current allowed range is 4K - 128M

View File

@@ -415,11 +415,8 @@ stop_flusher:
flusher->active_flushers++;
resume_1:
// Find it in clean_db
{
auto & clean_db = bs->clean_db_shard(cur.oid);
auto clean_it = clean_db.find(cur.oid);
old_clean_loc = (clean_it != clean_db.end() ? clean_it->second.location : UINT64_MAX);
}
clean_it = bs->clean_db.find(cur.oid);
old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX);
// Scan dirty versions of the object
if (!scan_dirty(1))
{
@@ -873,11 +870,10 @@ void journal_flusher_co::update_clean_db()
#endif
bs->data_alloc->set(old_clean_loc >> bs->block_order, false);
}
auto & clean_db = bs->clean_db_shard(cur.oid);
if (has_delete)
{
auto clean_it = clean_db.find(cur.oid);
clean_db.erase(clean_it);
auto clean_it = bs->clean_db.find(cur.oid);
bs->clean_db.erase(clean_it);
#ifdef BLOCKSTORE_DEBUG
printf("Free block %lu from %lx:%lx v%lu (delete)\n",
clean_loc >> bs->block_order,
@@ -888,7 +884,7 @@ void journal_flusher_co::update_clean_db()
}
else
{
clean_db[cur.oid] = {
bs->clean_db[cur.oid] = {
.version = cur.version,
.location = clean_loc,
};

View File

@@ -49,6 +49,7 @@ class journal_flusher_co
std::function<void(ring_data_t*)> simple_callback_r, simple_callback_w;
bool skip_copy, has_delete, has_writes;
blockstore_clean_db_t::iterator clean_it;
std::vector<copy_buffer_t> v;
std::vector<copy_buffer_t>::iterator it;
int copy_count;

View File

@@ -118,7 +118,7 @@ void blockstore_impl_t::loop()
// has_writes == 0 - no writes before the current queue item
// has_writes == 1 - some writes in progress
// has_writes == 2 - tried to submit some writes, but failed
int has_writes = 0, op_idx = 0, new_idx = 0, done_lists = 0;
int has_writes = 0, op_idx = 0, new_idx = 0;
for (; op_idx < submit_queue.size(); op_idx++, new_idx++)
{
auto op = submit_queue[op_idx];
@@ -198,14 +198,9 @@ void blockstore_impl_t::loop()
}
else if (op->opcode == BS_OP_LIST)
{
// LIST doesn't have to be blocked by previous modifications
// But don't do a lot of LISTs at once, because they're blocking and potentially slow
if (single_tick_list_limit <= 0 || done_lists < single_tick_list_limit)
{
process_list(op);
done_lists++;
wr_st = 2;
}
// LIST doesn't need to be blocked by previous modifications
process_list(op);
wr_st = 2;
}
if (wr_st == 2)
{
@@ -428,104 +423,22 @@ static bool replace_stable(object_id oid, uint64_t version, int search_start, in
return false;
}
blockstore_clean_db_t& blockstore_impl_t::clean_db_shard(object_id oid)
{
uint64_t pg_num = 0;
uint64_t pool_id = (oid.inode >> (64-POOL_ID_BITS));
auto sh_it = clean_db_settings.find(pool_id);
if (sh_it != clean_db_settings.end())
{
// like map_to_pg()
pg_num = (oid.stripe / sh_it->second.pg_stripe_size) % sh_it->second.pg_count + 1;
}
return clean_db_shards[(pool_id << (64-POOL_ID_BITS)) | pg_num];
}
void blockstore_impl_t::reshard_clean_db(pool_id_t pool, uint32_t pg_count, uint32_t pg_stripe_size)
{
uint64_t pool_id = (uint64_t)pool;
std::map<pool_pg_id_t, blockstore_clean_db_t> new_shards;
auto sh_it = clean_db_shards.lower_bound((pool_id << (64-POOL_ID_BITS)));
while (sh_it != clean_db_shards.end() &&
(sh_it->first >> (64-POOL_ID_BITS)) == pool_id)
{
for (auto & pair: sh_it->second)
{
// like map_to_pg()
uint64_t pg_num = (pair.first.stripe / pg_stripe_size) % pg_count + 1;
uint64_t shard_id = (pool_id << (64-POOL_ID_BITS)) | pg_num;
new_shards[shard_id][pair.first] = pair.second;
}
clean_db_shards.erase(sh_it++);
}
for (sh_it = new_shards.begin(); sh_it != new_shards.end(); sh_it++)
{
auto & to = clean_db_shards[sh_it->first];
to.swap(sh_it->second);
}
clean_db_settings[pool_id] = (pool_shard_settings_t){
.pg_count = pg_count,
.pg_stripe_size = pg_stripe_size,
};
}
void blockstore_impl_t::process_list(blockstore_op_t *op)
{
uint32_t list_pg = op->offset+1;
uint32_t list_pg = op->offset;
uint32_t pg_count = op->len;
uint64_t pg_stripe_size = op->oid.stripe;
uint64_t min_inode = op->oid.inode;
uint64_t max_inode = op->version;
// Check PG
if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg > pg_count))
if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg >= pg_count))
{
op->retval = -EINVAL;
FINISH_OP(op);
return;
}
// Check if the DB needs resharding
// (we don't know about PGs from the beginning, we only create "shards" here)
uint64_t first_shard = 0, last_shard = UINT64_MAX;
if (min_inode != 0 &&
// Check if min_inode == max_inode == pool_id<<N, i.e. this is a pool listing
(min_inode >> (64-POOL_ID_BITS)) == (max_inode >> (64-POOL_ID_BITS)))
{
pool_id_t pool_id = (min_inode >> (64-POOL_ID_BITS));
if (pg_count > 1)
{
// Per-pg listing
auto sh_it = clean_db_settings.find(pool_id);
if (sh_it == clean_db_settings.end() ||
sh_it->second.pg_count != pg_count ||
sh_it->second.pg_stripe_size != pg_stripe_size)
{
reshard_clean_db(pool_id, pg_count, pg_stripe_size);
}
first_shard = last_shard = ((uint64_t)pool_id << (64-POOL_ID_BITS)) | list_pg;
}
else
{
// Per-pool listing
first_shard = ((uint64_t)pool_id << (64-POOL_ID_BITS));
last_shard = ((uint64_t)(pool_id+1) << (64-POOL_ID_BITS)) - 1;
}
}
// Copy clean_db entries
int stable_count = 0, stable_alloc = 0;
if (min_inode != max_inode)
{
for (auto shard_it = clean_db_shards.lower_bound(first_shard);
shard_it != clean_db_shards.end() && shard_it->first <= last_shard;
shard_it++)
{
auto & clean_db = shard_it->second;
stable_alloc += clean_db.size();
}
}
else
{
stable_alloc = 32768;
}
// Copy clean_db entries (sorted)
int stable_count = 0, stable_alloc = clean_db.size() / (pg_count ? pg_count : 1);
obj_ver_id *stable = (obj_ver_id*)malloc(sizeof(obj_ver_id) * stable_alloc);
if (!stable)
{
@@ -533,11 +446,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
FINISH_OP(op);
return;
}
for (auto shard_it = clean_db_shards.lower_bound(first_shard);
shard_it != clean_db_shards.end() && shard_it->first <= last_shard;
shard_it++)
{
auto & clean_db = shard_it->second;
auto clean_it = clean_db.begin(), clean_end = clean_db.end();
if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode)
{
@@ -552,28 +461,26 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
}
for (; clean_it != clean_end; clean_it++)
{
if (stable_count >= stable_alloc)
if (!pg_count || ((clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
{
stable_alloc *= 2;
stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
if (!stable)
if (stable_count >= stable_alloc)
{
op->retval = -ENOMEM;
FINISH_OP(op);
return;
stable_alloc += 32768;
stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
if (!stable)
{
op->retval = -ENOMEM;
FINISH_OP(op);
return;
}
}
stable[stable_count++] = {
.oid = clean_it->first,
.version = clean_it->second.version,
};
}
stable[stable_count++] = {
.oid = clean_it->first,
.version = clean_it->second.version,
};
}
}
if (first_shard != last_shard)
{
// If that's not a per-PG listing, sort clean entries
std::sort(stable, stable+stable_count);
}
int clean_stable_count = stable_count;
// Copy dirty_db entries (sorted, too)
int unstable_count = 0, unstable_alloc = 0;
@@ -599,7 +506,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
}
for (; dirty_it != dirty_end; dirty_it++)
{
if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count + 1) == list_pg) // like map_to_pg()
if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
{
if (IS_DELETE(dirty_it->second.state))
{

View File

@@ -204,17 +204,6 @@ typedef std::map<obj_ver_id, dirty_entry> blockstore_dirty_db_t;
#include "blockstore_flush.h"
typedef uint32_t pool_id_t;
typedef uint64_t pool_pg_id_t;
#define POOL_ID_BITS 16
struct pool_shard_settings_t
{
uint32_t pg_count;
uint32_t pg_stripe_size;
};
class blockstore_impl_t
{
/******* OPTIONS *******/
@@ -252,14 +241,11 @@ class blockstore_impl_t
int throttle_target_parallelism = 1;
// Minimum difference in microseconds between target and real execution times to throttle the response
int throttle_threshold_us = 50;
// Maximum number of LIST operations to be processed between
int single_tick_list_limit = 1;
/******* END OF OPTIONS *******/
struct ring_consumer_t ring_consumer;
std::map<pool_id_t, pool_shard_settings_t> clean_db_settings;
std::map<pool_pg_id_t, blockstore_clean_db_t> clean_db_shards;
blockstore_clean_db_t clean_db;
uint8_t *clean_bitmap = NULL;
blockstore_dirty_db_t dirty_db;
std::vector<blockstore_op_t*> submit_queue;
@@ -308,9 +294,6 @@ class blockstore_impl_t
void open_journal();
uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset);
blockstore_clean_db_t& clean_db_shard(object_id oid);
void reshard_clean_db(pool_id_t pool_id, uint32_t pg_count, uint32_t pg_stripe_size);
// Journaling
void prepare_journal_sector_write(int sector, blockstore_op_t *op);
void handle_journal_write(ring_data_t *data, uint64_t flush_id);

View File

@@ -222,11 +222,10 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
}
if (entry->oid.inode > 0)
{
auto & clean_db = bs->clean_db_shard(entry->oid);
auto clean_it = clean_db.find(entry->oid);
if (clean_it == clean_db.end() || clean_it->second.version < entry->version)
auto clean_it = bs->clean_db.find(entry->oid);
if (clean_it == bs->clean_db.end() || clean_it->second.version < entry->version)
{
if (clean_it != clean_db.end())
if (clean_it != bs->clean_db.end())
{
// free the previous block
#ifdef BLOCKSTORE_DEBUG
@@ -246,7 +245,7 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
printf("Allocate block (clean entry) %lu: %lx:%lx v%lu\n", done_cnt+i, entry->oid.inode, entry->oid.stripe, entry->version);
#endif
bs->data_alloc->set(done_cnt+i, true);
clean_db[entry->oid] = (struct clean_entry){
bs->clean_db[entry->oid] = (struct clean_entry){
.version = entry->version,
.location = (done_cnt+i) << block_order,
};
@@ -657,9 +656,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
init_write_sector = proc_pos;
return 0;
}
auto & clean_db = bs->clean_db_shard(je->small_write.oid);
auto clean_it = clean_db.find(je->small_write.oid);
if (clean_it == clean_db.end() ||
auto clean_it = bs->clean_db.find(je->small_write.oid);
if (clean_it == bs->clean_db.end() ||
clean_it->second.version < je->small_write.version)
{
obj_ver_id ov = {
@@ -737,9 +735,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
erase_dirty_object(dirty_it);
}
}
auto & clean_db = bs->clean_db_shard(je->big_write.oid);
auto clean_it = clean_db.find(je->big_write.oid);
if (clean_it == clean_db.end() ||
auto clean_it = bs->clean_db.find(je->big_write.oid);
if (clean_it == bs->clean_db.end() ||
clean_it->second.version < je->big_write.version)
{
// oid, version, block
@@ -844,9 +841,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
dirty_it--;
dirty_exists = dirty_it->first.oid == je->del.oid;
}
auto & clean_db = bs->clean_db_shard(je->del.oid);
auto clean_it = clean_db.find(je->del.oid);
bool clean_exists = (clean_it != clean_db.end() &&
auto clean_it = bs->clean_db.find(je->del.oid);
bool clean_exists = (clean_it != bs->clean_db.end() &&
clean_it->second.version < je->del.version);
if (!clean_exists && dirty_exists)
{
@@ -905,9 +901,8 @@ void blockstore_init_journal::erase_dirty_object(blockstore_dirty_db_t::iterator
break;
}
}
auto & clean_db = bs->clean_db_shard(oid);
auto clean_it = clean_db.find(oid);
uint64_t clean_loc = clean_it != clean_db.end()
auto clean_it = bs->clean_db.find(oid);
uint64_t clean_loc = clean_it != bs->clean_db.end()
? clean_it->second.location : UINT64_MAX;
if (exists && clean_loc == UINT64_MAX)
{

View File

@@ -111,7 +111,6 @@ uint8_t* blockstore_impl_t::get_clean_entry_bitmap(uint64_t block_loc, int offse
int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
{
auto & clean_db = clean_db_shard(read_op->oid);
auto clean_it = clean_db.find(read_op->oid);
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
.oid = read_op->oid,
@@ -298,7 +297,6 @@ int blockstore_impl_t::read_bitmap(object_id oid, uint64_t target_version, void
dirty_it--;
}
}
auto & clean_db = clean_db_shard(oid);
auto clean_it = clean_db.find(oid);
if (clean_it != clean_db.end())
{

View File

@@ -54,7 +54,6 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
auto dirty_it = dirty_db.find(*v);
if (dirty_it == dirty_db.end())
{
auto & clean_db = clean_db_shard(v->oid);
auto clean_it = clean_db.find(v->oid);
if (clean_it == clean_db.end() || clean_it->second.version < v->version)
{
@@ -189,7 +188,6 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
}
if (exists == -1)
{
auto & clean_db = clean_db_shard(v.oid);
auto clean_it = clean_db.find(v.oid);
exists = clean_it != clean_db.end() ? 1 : 0;
}
@@ -217,7 +215,6 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
break;
}
}
auto & clean_db = clean_db_shard(v.oid);
auto clean_it = clean_db.find(v.oid);
uint64_t clean_loc = clean_it != clean_db.end()
? clean_it->second.location : UINT64_MAX;

View File

@@ -41,7 +41,6 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
}
if (!found)
{
auto & clean_db = clean_db_shard(op->oid);
auto clean_it = clean_db.find(op->oid);
if (clean_it != clean_db.end())
{

View File

@@ -154,7 +154,7 @@ resume_1:
if (pool_it != parent->cli->st_cli.pool_config.end())
{
auto & pool_cfg = pool_it->second;
used_size = used_size / (pool_pg_real_size[pool_id] ? pool_pg_real_size[pool_id] : 1)
used_size = used_size / pool_pg_real_size[pool_id]
* (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
}
auto stat_it = stats.find(inode_num);

View File

@@ -1,566 +1,211 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include <fcntl.h>
#include "cli.h"
#include "cluster_client.h"
#include "base64.h"
// Remove layer(s): similar to merge, but alters metadata and processes multiple merge targets
//
// Exactly one child of the requested layers may be merged using the "inverted" workflow,
// where we merge it "down" into one of the "to-be-removed" layers and then rename the
// "to-be-removed" layer to the child. It may be done either if all writers are stopped
// before trying to delete layers (which is signaled by --writers-stopped) or if that child
// is a read-only layer (snapshot) itself.
//
// This "inverted" workflow trades copying data of one of the deleted layers for copying
// data of one child of the chain which is also a child of the "traded" layer. So we
// choose the (parent,child) pair which has the largest difference between "parent" and
// "child" inode sizes.
//
// All other children of the chain are processed by iterating though them, merging removed
// parents into them and rebasing them to the last layer which isn't a member of the removed
// chain.
//
// Example:
//
// <parent> - <from> - <layer 2> - <to> - <child 1>
// \ \ \- <child 2>
// \ \- <child 3>
// \-<child 4>
//
// 1) Find optimal pair for the "reverse" scenario
// Imagine that it's (<layer 2>, <child 1>) in this example
// 2) Process all children except <child 1>:
// - Merge <from>..<to> to <child 2>
// - Set <child 2> parent to <parent>
// - Repeat for others
// 3) Process <child 1>:
// - Merge <from>..<child 1> to <layer 2>
// - Set <layer 2> parent to <parent>
// - Rename <layer 2> to <child 1>
// 4) Delete other layers of the chain (<from>, <to>)
struct snap_remover_t
#define RM_LISTING 1
#define RM_REMOVING 2
#define RM_END 3
struct rm_pg_t
{
cli_tool_t *parent;
// remove from..to
std::string from_name, to_name;
// writers are stopped, we can safely change writable layers
bool writers_stopped = false;
// use CAS writes (0 = never, 1 = auto, 2 = always)
int use_cas = 1;
// interval between fsyncs
int fsync_interval = 128;
std::map<inode_t,int> sources;
std::map<inode_t,uint64_t> inode_used;
std::vector<inode_t> merge_children;
std::vector<inode_t> chain_list;
std::map<inode_t,int> inverse_candidates;
inode_t inverse_parent = 0, inverse_child = 0;
inode_t new_parent = 0;
pg_num_t pg_num;
osd_num_t rm_osd_num;
std::set<object_id> objects;
std::set<object_id>::iterator obj_pos;
uint64_t obj_count = 0, obj_done = 0;
int state = 0;
int current_child = 0;
std::function<bool(void)> cb;
int in_flight = 0;
};
bool is_done()
struct rm_inode_t
{
uint64_t inode = 0;
pool_id_t pool_id = 0;
uint64_t min_offset = 0;
cli_tool_t *parent = NULL;
inode_list_t *lister = NULL;
std::vector<rm_pg_t*> lists;
uint64_t total_count = 0, total_done = 0, total_prev_pct = 0;
uint64_t pgs_to_list = 0;
bool lists_done = false;
int state = 0;
void start_delete()
{
return state == 9;
lister = parent->cli->list_inode_start(inode, [this](inode_list_t *lst,
std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)
{
rm_pg_t *rm = new rm_pg_t((rm_pg_t){
.pg_num = pg_num,
.rm_osd_num = primary_osd,
.objects = objects,
.obj_count = objects.size(),
.obj_done = 0,
});
if (min_offset == 0)
{
total_count += objects.size();
}
else
{
for (object_id oid: objects)
{
if (oid.stripe >= min_offset)
{
total_count++;
}
}
}
rm->obj_pos = rm->objects.begin();
lists.push_back(rm);
if (parent->list_first)
{
parent->cli->list_inode_next(lister, 1);
}
if (status & INODE_LIST_DONE)
{
lists_done = true;
}
pgs_to_list--;
continue_delete();
});
if (!lister)
{
fprintf(stderr, "Failed to list inode %lu from pool %u objects\n", INODE_NO_POOL(inode), INODE_POOL(inode));
exit(1);
}
pgs_to_list = parent->cli->list_pg_count(lister);
parent->cli->list_inode_next(lister, parent->parallel_osds);
}
void loop()
void send_ops(rm_pg_t *cur_list)
{
if (state == 1)
goto resume_1;
if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) ==
parent->cli->msgr.osd_peer_fds.end())
{
// Initiate connection
parent->cli->msgr.connect_peer(cur_list->rm_osd_num, parent->cli->st_cli.peer_states[cur_list->rm_osd_num]);
return;
}
while (cur_list->in_flight < parent->iodepth && cur_list->obj_pos != cur_list->objects.end())
{
if (cur_list->obj_pos->stripe >= min_offset)
{
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->peer_fd = parent->cli->msgr.osd_peer_fds[cur_list->rm_osd_num];
op->req = (osd_any_op_t){
.rw = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = parent->cli->next_op_id(),
.opcode = OSD_OP_DELETE,
},
.inode = cur_list->obj_pos->inode,
.offset = cur_list->obj_pos->stripe,
.len = 0,
},
};
op->callback = [this, cur_list](osd_op_t *op)
{
cur_list->in_flight--;
if (op->reply.hdr.retval < 0)
{
fprintf(stderr, "Failed to remove object %lx:%lx from PG %u (OSD %lu) (retval=%ld)\n",
op->req.rw.inode, op->req.rw.offset,
cur_list->pg_num, cur_list->rm_osd_num, op->reply.hdr.retval);
}
delete op;
cur_list->obj_done++;
total_done++;
continue_delete();
};
cur_list->in_flight++;
parent->cli->msgr.outbox_push(op);
}
cur_list->obj_pos++;
}
}
void continue_delete()
{
if (parent->list_first && !lists_done)
{
return;
}
for (int i = 0; i < lists.size(); i++)
{
if (!lists[i]->in_flight && lists[i]->obj_pos == lists[i]->objects.end())
{
delete lists[i];
lists.erase(lists.begin()+i, lists.begin()+i+1);
i--;
if (!lists_done)
{
parent->cli->list_inode_next(lister, 1);
}
}
else
{
send_ops(lists[i]);
}
}
if (parent->progress && total_count > 0 && total_done*1000/total_count != total_prev_pct)
{
printf("\rRemoved %lu/%lu objects, %lu more PGs to list...", total_done, total_count, pgs_to_list);
total_prev_pct = total_done*1000/total_count;
}
if (lists_done && !lists.size())
{
printf("Done, inode %lu in pool %u data removed\n", INODE_NO_POOL(inode), pool_id);
state = 2;
}
}
bool loop()
{
if (state == 0)
{
start_delete();
state = 1;
}
else if (state == 1)
{
continue_delete();
}
else if (state == 2)
goto resume_2;
else if (state == 3)
goto resume_3;
else if (state == 4)
goto resume_4;
else if (state == 5)
goto resume_5;
else if (state == 6)
goto resume_6;
else if (state == 7)
goto resume_7;
else if (state == 8)
goto resume_8;
else if (state == 9)
goto resume_9;
// Get children to merge
get_merge_children();
// Try to select an inode for the "inverse" optimized scenario
// Read statistics from etcd to do it
read_stats();
state = 1;
resume_1:
if (parent->waiting > 0)
return;
choose_inverse_candidate();
// Merge children one by one, except our "inverse" child
for (current_child = 0; current_child < merge_children.size(); current_child++)
{
if (merge_children[current_child] == inverse_child)
continue;
start_merge_child(merge_children[current_child], merge_children[current_child]);
resume_2:
while (!cb())
{
state = 2;
return;
}
cb = NULL;
parent->change_parent(merge_children[current_child], new_parent);
state = 3;
resume_3:
if (parent->waiting > 0)
return;
return true;
}
// Merge our "inverse" child into our "inverse" parent
if (inverse_child != 0)
{
start_merge_child(inverse_child, inverse_parent);
resume_4:
while (!cb())
{
state = 4;
return;
}
cb = NULL;
// Delete "inverse" child data
start_delete_source(inverse_child);
resume_5:
while (!cb())
{
state = 5;
return;
}
cb = NULL;
// Delete "inverse" child metadata, rename parent over it,
// and also change parent links of the previous "inverse" child
rename_inverse_parent();
state = 6;
resume_6:
if (parent->waiting > 0)
return;
}
// Delete parents, except the "inverse" one
for (current_child = 0; current_child < chain_list.size(); current_child++)
{
if (chain_list[current_child] == inverse_parent)
continue;
start_delete_source(chain_list[current_child]);
resume_7:
while (!cb())
{
state = 7;
return;
}
cb = NULL;
delete_inode_config(chain_list[current_child]);
state = 8;
resume_8:
if (parent->waiting > 0)
return;
}
state = 9;
resume_9:
// Done
return;
}
void get_merge_children()
{
// Get all children of from..to
inode_config_t *from_cfg = parent->get_inode_cfg(from_name);
inode_config_t *to_cfg = parent->get_inode_cfg(to_name);
// Check that to_cfg is actually a child of from_cfg
// FIXME de-copypaste the following piece of code with snap_merger_t
inode_config_t *cur = to_cfg;
chain_list.push_back(cur->num);
while (cur->num != from_cfg->num && cur->parent_id != 0)
{
auto it = parent->cli->st_cli.inode_config.find(cur->parent_id);
if (it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Parent inode of layer %s (id %ld) not found\n", cur->name.c_str(), cur->parent_id);
exit(1);
}
cur = &it->second;
chain_list.push_back(cur->num);
}
if (cur->num != from_cfg->num)
{
fprintf(stderr, "Layer %s is not a child of %s\n", to_name.c_str(), from_name.c_str());
exit(1);
}
new_parent = from_cfg->parent_id;
// Calculate ranks
int i = chain_list.size()-1;
for (inode_t item: chain_list)
{
sources[item] = i--;
}
for (auto & ic: parent->cli->st_cli.inode_config)
{
if (!ic.second.parent_id)
{
continue;
}
auto it = sources.find(ic.second.parent_id);
if (it != sources.end() && sources.find(ic.second.num) == sources.end())
{
merge_children.push_back(ic.second.num);
if (ic.second.readonly || writers_stopped)
{
inverse_candidates[ic.second.num] = it->second;
}
}
}
}
void read_stats()
{
if (inverse_candidates.size() == 0)
{
return;
}
json11::Json::array reads;
for (auto cp: inverse_candidates)
{
inode_t inode = cp.first;
reads.push_back(json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
"/"+std::to_string(INODE_NO_POOL(inode))
) },
} }
});
}
for (auto cp: sources)
{
inode_t inode = cp.first;
reads.push_back(json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
"/"+std::to_string(INODE_NO_POOL(inode))
) },
} }
});
}
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "success", reads },
}, [this](std::string err, json11::Json data)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error reading layer statistics from etcd: %s\n", err.c_str());
exit(1);
}
for (auto inode_result: data["responses"].array_items())
{
auto kv = parent->cli->st_cli.parse_etcd_kv(inode_result["kvs"][0]);
pool_id_t pool_id = 0;
inode_t inode = 0;
char null_byte = 0;
sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.length()+13, "%u/%lu%c", &pool_id, &inode, &null_byte);
if (!inode || null_byte != 0)
{
fprintf(stderr, "Bad key returned from etcd: %s\n", kv.key.c_str());
exit(1);
}
auto pool_cfg_it = parent->cli->st_cli.pool_config.find(pool_id);
if (pool_cfg_it == parent->cli->st_cli.pool_config.end())
{
fprintf(stderr, "Pool %u does not exist\n", pool_id);
exit(1);
}
inode = INODE_WITH_POOL(pool_id, inode);
auto & pool_cfg = pool_cfg_it->second;
uint64_t used_bytes = kv.value["raw_used"].uint64_value() / pool_cfg.pg_size;
if (pool_cfg.scheme != POOL_SCHEME_REPLICATED)
{
used_bytes *= (pool_cfg.pg_size - pool_cfg.parity_chunks);
}
inode_used[inode] = used_bytes;
}
parent->ringloop->wakeup();
});
}
void choose_inverse_candidate()
{
uint64_t max_diff = 0;
for (auto cp: inverse_candidates)
{
inode_t child = cp.first;
uint64_t child_used = inode_used[child];
int rank = cp.second;
for (int i = chain_list.size()-rank; i < chain_list.size(); i++)
{
inode_t parent = chain_list[i];
uint64_t parent_used = inode_used[parent];
if (parent_used > child_used && (!max_diff || max_diff < (parent_used-child_used)))
{
max_diff = (parent_used-child_used);
inverse_parent = parent;
inverse_child = child;
}
}
}
}
void rename_inverse_parent()
{
auto child_it = parent->cli->st_cli.inode_config.find(inverse_child);
if (child_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inverse_child);
exit(1);
}
auto target_it = parent->cli->st_cli.inode_config.find(inverse_parent);
if (target_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inverse_parent);
exit(1);
}
inode_config_t *child_cfg = &child_it->second;
inode_config_t *target_cfg = &target_it->second;
std::string child_name = child_cfg->name;
std::string target_name = target_cfg->name;
std::string child_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(inverse_child))+
"/"+std::to_string(INODE_NO_POOL(inverse_child))
);
std::string target_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(inverse_parent))+
"/"+std::to_string(INODE_NO_POOL(inverse_parent))
);
// Fill new configuration
inode_config_t new_cfg = *child_cfg;
new_cfg.num = target_cfg->num;
new_cfg.parent_id = new_parent;
json11::Json::array cmp = json11::Json::array {
json11::Json::object {
{ "target", "MOD" },
{ "key", child_cfg_key },
{ "result", "LESS" },
{ "mod_revision", child_cfg->mod_revision+1 },
},
json11::Json::object {
{ "target", "MOD" },
{ "key", target_cfg_key },
{ "result", "LESS" },
{ "mod_revision", target_cfg->mod_revision+1 },
},
};
json11::Json::array txn = json11::Json::array {
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", child_cfg_key },
} },
},
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", target_cfg_key },
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&new_cfg)).dump()) },
} },
},
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+child_cfg->name) },
{ "value", base64_encode(json11::Json({
{ "id", INODE_NO_POOL(inverse_parent) },
{ "pool_id", (uint64_t)INODE_POOL(inverse_parent) },
}).dump()) },
} },
},
};
// Reparent children of inverse_child
for (auto & cp: parent->cli->st_cli.inode_config)
{
if (cp.second.parent_id == child_cfg->num)
{
auto cp_cfg = cp.second;
cp_cfg.parent_id = inverse_parent;
auto cp_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(cp.second.num))+
"/"+std::to_string(INODE_NO_POOL(cp.second.num))
);
cmp.push_back(json11::Json::object {
{ "target", "MOD" },
{ "key", cp_key },
{ "result", "LESS" },
{ "mod_revision", cp.second.mod_revision+1 },
});
txn.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", cp_key },
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&cp_cfg)).dump()) },
} },
});
}
}
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "compare", cmp },
{ "success", txn },
}, [this, target_name, child_name](std::string err, json11::Json res)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error renaming %s to %s: %s\n", target_name.c_str(), child_name.c_str(), err.c_str());
exit(1);
}
if (!res["succeeded"].bool_value())
{
fprintf(
stderr, "Parent (%s), child (%s), or one of its children"
" configuration was modified during rename\n", target_name.c_str(), child_name.c_str()
);
exit(1);
}
printf("Layer %s renamed to %s\n", target_name.c_str(), child_name.c_str());
parent->ringloop->wakeup();
});
}
void delete_inode_config(inode_t cur)
{
auto cur_cfg_it = parent->cli->st_cli.inode_config.find(cur);
if (cur_cfg_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode 0x%lx disappeared\n", cur);
exit(1);
}
inode_config_t *cur_cfg = &cur_cfg_it->second;
std::string cur_name = cur_cfg->name;
std::string cur_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(cur))+
"/"+std::to_string(INODE_NO_POOL(cur))
);
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "compare", json11::Json::array {
json11::Json::object {
{ "target", "MOD" },
{ "key", cur_cfg_key },
{ "result", "LESS" },
{ "mod_revision", cur_cfg->mod_revision+1 },
},
} },
{ "success", json11::Json::array {
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", cur_cfg_key },
} },
},
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+cur_name) },
} },
},
} },
}, [this, cur_name](std::string err, json11::Json res)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error deleting %s: %s\n", cur_name.c_str(), err.c_str());
exit(1);
}
if (!res["succeeded"].bool_value())
{
fprintf(stderr, "Layer %s configuration was modified during deletion\n", cur_name.c_str());
exit(1);
}
printf("Layer %s deleted\n", cur_name.c_str());
parent->ringloop->wakeup();
});
}
void start_merge_child(inode_t child_inode, inode_t target_inode)
{
auto child_it = parent->cli->st_cli.inode_config.find(child_inode);
if (child_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", child_inode);
exit(1);
}
auto target_it = parent->cli->st_cli.inode_config.find(target_inode);
if (target_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", target_inode);
exit(1);
}
cb = parent->start_merge(json11::Json::object {
{ "command", json11::Json::array{ "merge-data", from_name, child_it->second.name } },
{ "target", target_it->second.name },
{ "delete-source", false },
{ "cas", use_cas },
{ "fsync-interval", fsync_interval },
});
}
void start_delete_source(inode_t inode)
{
auto source = parent->cli->st_cli.inode_config.find(inode);
if (source == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inode);
exit(1);
}
cb = parent->start_rm(json11::Json::object {
{ "inode", inode },
{ "pool", (uint64_t)INODE_POOL(inode) },
{ "fsync-interval", fsync_interval },
});
return false;
}
};
std::function<bool(void)> cli_tool_t::start_snap_rm(json11::Json cfg)
std::function<bool(void)> cli_tool_t::start_rm(json11::Json cfg)
{
json11::Json::array cmd = cfg["command"].array_items();
auto snap_remover = new snap_remover_t();
snap_remover->parent = this;
snap_remover->from_name = cmd.size() > 1 ? cmd[1].string_value() : "";
snap_remover->to_name = cmd.size() > 2 ? cmd[2].string_value() : "";
if (snap_remover->from_name == "")
auto remover = new rm_inode_t();
remover->parent = this;
remover->inode = cfg["inode"].uint64_value();
remover->pool_id = cfg["pool"].uint64_value();
if (remover->pool_id)
{
fprintf(stderr, "Layer to remove argument is missing\n");
remover->inode = (remover->inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (((uint64_t)remover->pool_id) << (64-POOL_ID_BITS));
}
remover->pool_id = INODE_POOL(remover->inode);
if (!remover->pool_id)
{
fprintf(stderr, "pool is missing\n");
exit(1);
}
if (snap_remover->to_name == "")
remover->min_offset = cfg["min-offset"].uint64_value();
return [remover]()
{
snap_remover->to_name = snap_remover->from_name;
}
snap_remover->fsync_interval = cfg["fsync-interval"].uint64_value();
if (!snap_remover->fsync_interval)
snap_remover->fsync_interval = 128;
if (!cfg["cas"].is_null())
snap_remover->use_cas = cfg["cas"].uint64_value() ? 2 : 0;
if (!cfg["writers_stopped"].is_null())
snap_remover->writers_stopped = true;
return [snap_remover]()
{
snap_remover->loop();
if (snap_remover->is_done())
if (remover->loop())
{
delete snap_remover;
delete remover;
return true;
}
return false;

View File

@@ -1,214 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include "cli.h"
#include "cluster_client.h"
#define RM_LISTING 1
#define RM_REMOVING 2
#define RM_END 3
struct rm_pg_t
{
pg_num_t pg_num;
osd_num_t rm_osd_num;
std::set<object_id> objects;
std::set<object_id>::iterator obj_pos;
uint64_t obj_count = 0, obj_done = 0;
int state = 0;
int in_flight = 0;
};
struct rm_inode_t
{
uint64_t inode = 0;
pool_id_t pool_id = 0;
uint64_t min_offset = 0;
cli_tool_t *parent = NULL;
inode_list_t *lister = NULL;
std::vector<rm_pg_t*> lists;
uint64_t total_count = 0, total_done = 0, total_prev_pct = 0;
uint64_t pgs_to_list = 0;
bool lists_done = false;
int state = 0;
void start_delete()
{
lister = parent->cli->list_inode_start(inode, [this](inode_list_t *lst,
std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)
{
rm_pg_t *rm = new rm_pg_t((rm_pg_t){
.pg_num = pg_num,
.rm_osd_num = primary_osd,
.objects = objects,
.obj_count = objects.size(),
.obj_done = 0,
});
if (min_offset == 0)
{
total_count += objects.size();
}
else
{
for (object_id oid: objects)
{
if (oid.stripe >= min_offset)
{
total_count++;
}
}
}
rm->obj_pos = rm->objects.begin();
lists.push_back(rm);
if (parent->list_first)
{
parent->cli->list_inode_next(lister, 1);
}
if (status & INODE_LIST_DONE)
{
lists_done = true;
}
pgs_to_list--;
continue_delete();
});
if (!lister)
{
fprintf(stderr, "Failed to list inode %lu from pool %u objects\n", INODE_NO_POOL(inode), INODE_POOL(inode));
exit(1);
}
pgs_to_list = parent->cli->list_pg_count(lister);
parent->cli->list_inode_next(lister, parent->parallel_osds);
}
void send_ops(rm_pg_t *cur_list)
{
if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) ==
parent->cli->msgr.osd_peer_fds.end())
{
// Initiate connection
parent->cli->msgr.connect_peer(cur_list->rm_osd_num, parent->cli->st_cli.peer_states[cur_list->rm_osd_num]);
return;
}
while (cur_list->in_flight < parent->iodepth && cur_list->obj_pos != cur_list->objects.end())
{
if (cur_list->obj_pos->stripe >= min_offset)
{
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
// Already checked that it exists above, but anyway
op->peer_fd = parent->cli->msgr.osd_peer_fds.at(cur_list->rm_osd_num);
op->req = (osd_any_op_t){
.rw = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = parent->cli->next_op_id(),
.opcode = OSD_OP_DELETE,
},
.inode = cur_list->obj_pos->inode,
.offset = cur_list->obj_pos->stripe,
.len = 0,
},
};
op->callback = [this, cur_list](osd_op_t *op)
{
cur_list->in_flight--;
if (op->reply.hdr.retval < 0)
{
fprintf(stderr, "Failed to remove object %lx:%lx from PG %u (OSD %lu) (retval=%ld)\n",
op->req.rw.inode, op->req.rw.offset,
cur_list->pg_num, cur_list->rm_osd_num, op->reply.hdr.retval);
}
delete op;
cur_list->obj_done++;
total_done++;
continue_delete();
};
cur_list->in_flight++;
parent->cli->msgr.outbox_push(op);
}
cur_list->obj_pos++;
}
}
void continue_delete()
{
if (parent->list_first && !lists_done)
{
return;
}
for (int i = 0; i < lists.size(); i++)
{
if (!lists[i]->in_flight && lists[i]->obj_pos == lists[i]->objects.end())
{
delete lists[i];
lists.erase(lists.begin()+i, lists.begin()+i+1);
i--;
if (!lists_done)
{
parent->cli->list_inode_next(lister, 1);
}
}
else
{
send_ops(lists[i]);
}
}
if (parent->progress && total_count > 0 && total_done*1000/total_count != total_prev_pct)
{
printf("\rRemoved %lu/%lu objects, %lu more PGs to list...", total_done, total_count, pgs_to_list);
total_prev_pct = total_done*1000/total_count;
}
if (lists_done && !lists.size())
{
printf("Done, inode %lu in pool %u data removed\n", INODE_NO_POOL(inode), pool_id);
state = 2;
}
}
bool loop()
{
if (state == 0)
{
start_delete();
state = 1;
}
else if (state == 1)
{
continue_delete();
}
else if (state == 2)
{
return true;
}
return false;
}
};
std::function<bool(void)> cli_tool_t::start_rm(json11::Json cfg)
{
auto remover = new rm_inode_t();
remover->parent = this;
remover->inode = cfg["inode"].uint64_value();
remover->pool_id = cfg["pool"].uint64_value();
if (remover->pool_id)
{
remover->inode = (remover->inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (((uint64_t)remover->pool_id) << (64-POOL_ID_BITS));
}
remover->pool_id = INODE_POOL(remover->inode);
if (!remover->pool_id)
{
fprintf(stderr, "pool is missing\n");
exit(1);
}
remover->min_offset = cfg["min-offset"].uint64_value();
return [remover]()
{
if (remover->loop())
{
delete remover;
return true;
}
return false;
};
}

568
src/cli_snap_rm.cpp Normal file
View File

@@ -0,0 +1,568 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include <fcntl.h>
#include "cli.h"
#include "cluster_client.h"
#include "base64.h"
// Remove layer(s): similar to merge, but alters metadata and processes multiple merge targets
//
// Exactly one child of the requested layers may be merged using the "inverted" workflow,
// where we merge it "down" into one of the "to-be-removed" layers and then rename the
// "to-be-removed" layer to the child. It may be done either if all writers are stopped
// before trying to delete layers (which is signaled by --writers-stopped) or if that child
// is a read-only layer (snapshot) itself.
//
// This "inverted" workflow trades copying data of one of the deleted layers for copying
// data of one child of the chain which is also a child of the "traded" layer. So we
// choose the (parent,child) pair which has the largest difference between "parent" and
// "child" inode sizes.
//
// All other children of the chain are processed by iterating though them, merging removed
// parents into them and rebasing them to the last layer which isn't a member of the removed
// chain.
//
// Example:
//
// <parent> - <from> - <layer 2> - <to> - <child 1>
// \ \ \- <child 2>
// \ \- <child 3>
// \-<child 4>
//
// 1) Find optimal pair for the "reverse" scenario
// Imagine that it's (<layer 2>, <child 1>) in this example
// 2) Process all children except <child 1>:
// - Merge <from>..<to> to <child 2>
// - Set <child 2> parent to <parent>
// - Repeat for others
// 3) Process <child 1>:
// - Merge <from>..<child 1> to <layer 2>
// - Set <layer 2> parent to <parent>
// - Rename <layer 2> to <child 1>
// 4) Delete other layers of the chain (<from>, <to>)
struct snap_remover_t
{
cli_tool_t *parent;
// remove from..to
std::string from_name, to_name;
// writers are stopped, we can safely change writable layers
bool writers_stopped = false;
// use CAS writes (0 = never, 1 = auto, 2 = always)
int use_cas = 1;
// interval between fsyncs
int fsync_interval = 128;
std::map<inode_t,int> sources;
std::map<inode_t,uint64_t> inode_used;
std::vector<inode_t> merge_children;
std::vector<inode_t> chain_list;
std::map<inode_t,int> inverse_candidates;
inode_t inverse_parent = 0, inverse_child = 0;
inode_t new_parent = 0;
int state = 0;
int current_child = 0;
std::function<bool(void)> cb;
bool is_done()
{
return state == 9;
}
void loop()
{
if (state == 1)
goto resume_1;
else if (state == 2)
goto resume_2;
else if (state == 3)
goto resume_3;
else if (state == 4)
goto resume_4;
else if (state == 5)
goto resume_5;
else if (state == 6)
goto resume_6;
else if (state == 7)
goto resume_7;
else if (state == 8)
goto resume_8;
else if (state == 9)
goto resume_9;
// Get children to merge
get_merge_children();
// Try to select an inode for the "inverse" optimized scenario
// Read statistics from etcd to do it
read_stats();
state = 1;
resume_1:
if (parent->waiting > 0)
return;
choose_inverse_candidate();
// Merge children one by one, except our "inverse" child
for (current_child = 0; current_child < merge_children.size(); current_child++)
{
if (merge_children[current_child] == inverse_child)
continue;
start_merge_child(merge_children[current_child], merge_children[current_child]);
resume_2:
while (!cb())
{
state = 2;
return;
}
cb = NULL;
parent->change_parent(merge_children[current_child], new_parent);
state = 3;
resume_3:
if (parent->waiting > 0)
return;
}
// Merge our "inverse" child into our "inverse" parent
if (inverse_child != 0)
{
start_merge_child(inverse_child, inverse_parent);
resume_4:
while (!cb())
{
state = 4;
return;
}
cb = NULL;
// Delete "inverse" child data
start_delete_source(inverse_child);
resume_5:
while (!cb())
{
state = 5;
return;
}
cb = NULL;
// Delete "inverse" child metadata, rename parent over it,
// and also change parent links of the previous "inverse" child
rename_inverse_parent();
state = 6;
resume_6:
if (parent->waiting > 0)
return;
}
// Delete parents, except the "inverse" one
for (current_child = 0; current_child < chain_list.size(); current_child++)
{
if (chain_list[current_child] == inverse_parent)
continue;
start_delete_source(chain_list[current_child]);
resume_7:
while (!cb())
{
state = 7;
return;
}
cb = NULL;
delete_inode_config(chain_list[current_child]);
state = 8;
resume_8:
if (parent->waiting > 0)
return;
}
state = 9;
resume_9:
// Done
return;
}
void get_merge_children()
{
// Get all children of from..to
inode_config_t *from_cfg = parent->get_inode_cfg(from_name);
inode_config_t *to_cfg = parent->get_inode_cfg(to_name);
// Check that to_cfg is actually a child of from_cfg
// FIXME de-copypaste the following piece of code with snap_merger_t
inode_config_t *cur = to_cfg;
chain_list.push_back(cur->num);
while (cur->num != from_cfg->num && cur->parent_id != 0)
{
auto it = parent->cli->st_cli.inode_config.find(cur->parent_id);
if (it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Parent inode of layer %s (id %ld) not found\n", cur->name.c_str(), cur->parent_id);
exit(1);
}
cur = &it->second;
chain_list.push_back(cur->num);
}
if (cur->num != from_cfg->num)
{
fprintf(stderr, "Layer %s is not a child of %s\n", to_name.c_str(), from_name.c_str());
exit(1);
}
new_parent = from_cfg->parent_id;
// Calculate ranks
int i = chain_list.size()-1;
for (inode_t item: chain_list)
{
sources[item] = i--;
}
for (auto & ic: parent->cli->st_cli.inode_config)
{
if (!ic.second.parent_id)
{
continue;
}
auto it = sources.find(ic.second.parent_id);
if (it != sources.end() && sources.find(ic.second.num) == sources.end())
{
merge_children.push_back(ic.second.num);
if (ic.second.readonly || writers_stopped)
{
inverse_candidates[ic.second.num] = it->second;
}
}
}
}
void read_stats()
{
if (inverse_candidates.size() == 0)
{
return;
}
json11::Json::array reads;
for (auto cp: inverse_candidates)
{
inode_t inode = cp.first;
reads.push_back(json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
"/"+std::to_string(INODE_NO_POOL(inode))
) },
} }
});
}
for (auto cp: sources)
{
inode_t inode = cp.first;
reads.push_back(json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(
parent->cli->st_cli.etcd_prefix+
"/inode/stats/"+std::to_string(INODE_POOL(inode))+
"/"+std::to_string(INODE_NO_POOL(inode))
) },
} }
});
}
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "success", reads },
}, [this](std::string err, json11::Json data)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error reading layer statistics from etcd: %s\n", err.c_str());
exit(1);
}
for (auto inode_result: data["responses"].array_items())
{
auto kv = parent->cli->st_cli.parse_etcd_kv(inode_result["kvs"][0]);
pool_id_t pool_id = 0;
inode_t inode = 0;
char null_byte = 0;
sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.length()+13, "%u/%lu%c", &pool_id, &inode, &null_byte);
if (!inode || null_byte != 0)
{
fprintf(stderr, "Bad key returned from etcd: %s\n", kv.key.c_str());
exit(1);
}
auto pool_cfg_it = parent->cli->st_cli.pool_config.find(pool_id);
if (pool_cfg_it == parent->cli->st_cli.pool_config.end())
{
fprintf(stderr, "Pool %u does not exist\n", pool_id);
exit(1);
}
inode = INODE_WITH_POOL(pool_id, inode);
auto & pool_cfg = pool_cfg_it->second;
uint64_t used_bytes = kv.value["raw_used"].uint64_value() / pool_cfg.pg_size;
if (pool_cfg.scheme != POOL_SCHEME_REPLICATED)
{
used_bytes *= (pool_cfg.pg_size - pool_cfg.parity_chunks);
}
inode_used[inode] = used_bytes;
}
parent->ringloop->wakeup();
});
}
void choose_inverse_candidate()
{
uint64_t max_diff = 0;
for (auto cp: inverse_candidates)
{
inode_t child = cp.first;
uint64_t child_used = inode_used[child];
int rank = cp.second;
for (int i = chain_list.size()-rank; i < chain_list.size(); i++)
{
inode_t parent = chain_list[i];
uint64_t parent_used = inode_used[parent];
if (parent_used > child_used && (!max_diff || max_diff < (parent_used-child_used)))
{
max_diff = (parent_used-child_used);
inverse_parent = parent;
inverse_child = child;
}
}
}
}
void rename_inverse_parent()
{
auto child_it = parent->cli->st_cli.inode_config.find(inverse_child);
if (child_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inverse_child);
exit(1);
}
auto target_it = parent->cli->st_cli.inode_config.find(inverse_parent);
if (target_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inverse_parent);
exit(1);
}
inode_config_t *child_cfg = &child_it->second;
inode_config_t *target_cfg = &target_it->second;
std::string child_name = child_cfg->name;
std::string target_name = target_cfg->name;
std::string child_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(inverse_child))+
"/"+std::to_string(INODE_NO_POOL(inverse_child))
);
std::string target_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(inverse_parent))+
"/"+std::to_string(INODE_NO_POOL(inverse_parent))
);
// Fill new configuration
inode_config_t new_cfg = *child_cfg;
new_cfg.num = target_cfg->num;
new_cfg.parent_id = new_parent;
json11::Json::array cmp = json11::Json::array {
json11::Json::object {
{ "target", "MOD" },
{ "key", child_cfg_key },
{ "result", "LESS" },
{ "mod_revision", child_cfg->mod_revision+1 },
},
json11::Json::object {
{ "target", "MOD" },
{ "key", target_cfg_key },
{ "result", "LESS" },
{ "mod_revision", target_cfg->mod_revision+1 },
},
};
json11::Json::array txn = json11::Json::array {
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", child_cfg_key },
} },
},
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", target_cfg_key },
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&new_cfg)).dump()) },
} },
},
json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+child_cfg->name) },
{ "value", base64_encode(json11::Json({
{ "id", INODE_NO_POOL(inverse_parent) },
{ "pool_id", (uint64_t)INODE_POOL(inverse_parent) },
}).dump()) },
} },
},
};
// Reparent children of inverse_child
for (auto & cp: parent->cli->st_cli.inode_config)
{
if (cp.second.parent_id == child_cfg->num)
{
auto cp_cfg = cp.second;
cp_cfg.parent_id = inverse_parent;
auto cp_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(cp.second.num))+
"/"+std::to_string(INODE_NO_POOL(cp.second.num))
);
cmp.push_back(json11::Json::object {
{ "target", "MOD" },
{ "key", cp_key },
{ "result", "LESS" },
{ "mod_revision", cp.second.mod_revision+1 },
});
txn.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", cp_key },
{ "value", base64_encode(json11::Json(parent->cli->st_cli.serialize_inode_cfg(&cp_cfg)).dump()) },
} },
});
}
}
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "compare", cmp },
{ "success", txn },
}, [this, target_name, child_name](std::string err, json11::Json res)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error renaming %s to %s: %s\n", target_name.c_str(), child_name.c_str(), err.c_str());
exit(1);
}
if (!res["succeeded"].bool_value())
{
fprintf(
stderr, "Parent (%s), child (%s), or one of its children"
" configuration was modified during rename\n", target_name.c_str(), child_name.c_str()
);
exit(1);
}
printf("Layer %s renamed to %s\n", target_name.c_str(), child_name.c_str());
parent->ringloop->wakeup();
});
}
void delete_inode_config(inode_t cur)
{
auto cur_cfg_it = parent->cli->st_cli.inode_config.find(cur);
if (cur_cfg_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode 0x%lx disappeared\n", cur);
exit(1);
}
inode_config_t *cur_cfg = &cur_cfg_it->second;
std::string cur_name = cur_cfg->name;
std::string cur_cfg_key = base64_encode(
parent->cli->st_cli.etcd_prefix+
"/config/inode/"+std::to_string(INODE_POOL(cur))+
"/"+std::to_string(INODE_NO_POOL(cur))
);
parent->waiting++;
parent->cli->st_cli.etcd_txn_slow(json11::Json::object {
{ "compare", json11::Json::array {
json11::Json::object {
{ "target", "MOD" },
{ "key", cur_cfg_key },
{ "result", "LESS" },
{ "mod_revision", cur_cfg->mod_revision+1 },
},
} },
{ "success", json11::Json::array {
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", cur_cfg_key },
} },
},
json11::Json::object {
{ "request_delete_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/index/image/"+cur_name) },
} },
},
} },
}, [this, cur_name](std::string err, json11::Json res)
{
parent->waiting--;
if (err != "")
{
fprintf(stderr, "Error deleting %s: %s\n", cur_name.c_str(), err.c_str());
exit(1);
}
if (!res["succeeded"].bool_value())
{
fprintf(stderr, "Layer %s configuration was modified during deletion\n", cur_name.c_str());
exit(1);
}
printf("Layer %s deleted\n", cur_name.c_str());
parent->ringloop->wakeup();
});
}
void start_merge_child(inode_t child_inode, inode_t target_inode)
{
auto child_it = parent->cli->st_cli.inode_config.find(child_inode);
if (child_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", child_inode);
exit(1);
}
auto target_it = parent->cli->st_cli.inode_config.find(target_inode);
if (target_it == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", target_inode);
exit(1);
}
cb = parent->start_merge(json11::Json::object {
{ "command", json11::Json::array{ "merge-data", from_name, child_it->second.name } },
{ "target", target_it->second.name },
{ "delete-source", false },
{ "cas", use_cas },
{ "fsync-interval", fsync_interval },
});
}
void start_delete_source(inode_t inode)
{
auto source = parent->cli->st_cli.inode_config.find(inode);
if (source == parent->cli->st_cli.inode_config.end())
{
fprintf(stderr, "Inode %ld disappeared\n", inode);
exit(1);
}
cb = parent->start_rm(json11::Json::object {
{ "inode", inode },
{ "pool", (uint64_t)INODE_POOL(inode) },
{ "fsync-interval", fsync_interval },
});
}
};
std::function<bool(void)> cli_tool_t::start_snap_rm(json11::Json cfg)
{
json11::Json::array cmd = cfg["command"].array_items();
auto snap_remover = new snap_remover_t();
snap_remover->parent = this;
snap_remover->from_name = cmd.size() > 1 ? cmd[1].string_value() : "";
snap_remover->to_name = cmd.size() > 2 ? cmd[2].string_value() : "";
if (snap_remover->from_name == "")
{
fprintf(stderr, "Layer to remove argument is missing\n");
exit(1);
}
if (snap_remover->to_name == "")
{
snap_remover->to_name = snap_remover->from_name;
}
snap_remover->fsync_interval = cfg["fsync-interval"].uint64_value();
if (!snap_remover->fsync_interval)
snap_remover->fsync_interval = 128;
if (!cfg["cas"].is_null())
snap_remover->use_cas = cfg["cas"].uint64_value() ? 2 : 0;
if (!cfg["writers_stopped"].is_null())
snap_remover->writers_stopped = true;
return [snap_remover]()
{
snap_remover->loop();
if (snap_remover->is_done())
{
delete snap_remover;
return true;
}
return false;
};
}

View File

@@ -143,7 +143,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op)
}
else /* if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP) */
{
for (auto prev = op_queue_head; prev && prev != op; prev = prev->next)
for (auto prev = op->prev; prev; prev = prev->prev)
{
if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER)
{
@@ -151,7 +151,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op)
}
else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ || prev->opcode == OSD_OP_READ_BITMAP)
{
// Flushes are always in the beginning (we're scanning from the beginning of the queue)
// Flushes are always in the beginning
break;
}
}
@@ -172,7 +172,6 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n
(next->opcode == OSD_OP_READ || next->opcode == OSD_OP_READ_BITMAP) && (flags & OP_FLUSH_BUFFER))
{
next->prev_wait += inc;
assert(next->prev_wait >= 0);
if (!next->prev_wait)
{
if (next->opcode == OSD_OP_SYNC)
@@ -192,7 +191,6 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n
if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE)
{
next->prev_wait += inc;
assert(next->prev_wait >= 0);
if (!next->prev_wait)
{
if (next->opcode == OSD_OP_SYNC)

View File

@@ -200,8 +200,7 @@ void cluster_client_t::send_list(inode_list_osd_t *cur_list)
auto & pool_cfg = st_cli.pool_config[cur_list->pg->lst->pool_id];
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
// Already checked that it exists above, but anyway
op->peer_fd = msgr.osd_peer_fds.at(cur_list->osd_num);
op->peer_fd = msgr.osd_peer_fds[cur_list->osd_num];
op->req = (osd_any_op_t){
.sec_list = {
.header = {

View File

@@ -351,9 +351,9 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
}
else
{
printf("+++ %s 0x%lx 0x%llx+%lx\n",
printf("+++ %s 0x%lx 0x%llx+%llx\n",
io->ddir == DDIR_READ ? "READ" : "WRITE",
(uint64_t)io, io->offset, (uint64_t)io->xfer_buflen);
(uint64_t)io, io->offset, io->xfer_buflen);
}
}

View File

@@ -170,14 +170,14 @@ static int sec_init(struct thread_data *td)
bsd->block_order = o->block_order == 0 ? 17 : o->block_order;
bsd->block_size = 1 << o->block_order;
sockaddr_storage addr;
sockaddr addr;
if (!string_to_addr(std::string(o->host ? o->host : "127.0.0.1"), false, o->port > 0 ? o->port : 11203, &addr))
{
fprintf(stderr, "server address: %s is not valid\n", o->host ? o->host : "127.0.0.1");
return 1;
}
bsd->connect_fd = socket(addr.ss_family, SOCK_STREAM, 0);
bsd->connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (bsd->connect_fd < 0)
{
perror("socket");
@@ -355,7 +355,7 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
{
if (reply.hdr.retval != io->xfer_buflen)
{
fprintf(stderr, "Short read: retval = %ld instead of %lu\n", reply.hdr.retval, (uint64_t)io->xfer_buflen);
fprintf(stderr, "Short read: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen);
exit(1);
}
// Support bitmap
@@ -380,7 +380,7 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
{
if (reply.hdr.retval != io->xfer_buflen)
{
fprintf(stderr, "Short write: retval = %ld instead of %lu\n", reply.hdr.retval, (uint64_t)io->xfer_buflen);
fprintf(stderr, "Short write: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen);
exit(1);
}
}

View File

@@ -62,10 +62,9 @@ struct http_co_t
void run_cb_and_clear();
void start_connection();
void close_connection();
void next_request();
void handle_events();
void handle_connect_result();
void submit_read(bool check_timeout);
void submit_read();
void submit_send();
bool handle_read();
void post_message(int type, const std::string & msg);
@@ -129,7 +128,6 @@ void http_co_t::run_cb_and_clear()
// Call callback after clearing it because otherwise we may hit reenterability problems
if (cb != NULL)
cb(&parsed);
next_request();
}
void http_co_t::send_request(const std::string & host, const std::string & request,
@@ -163,6 +161,17 @@ void http_co_t::send_request(const std::string & host, const std::string & reque
this->sent = 0;
this->response_callback = response_callback;
this->parsed = {};
if (request_timeout > 0)
{
timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id)
{
stackin();
close_connection();
parsed = { .error = "HTTP request timed out" };
run_cb_and_clear();
stackout();
});
}
if (state == HTTP_CO_KEEPALIVE)
{
state = HTTP_CO_SENDING_REQUEST;
@@ -172,28 +181,6 @@ void http_co_t::send_request(const std::string & host, const std::string & reque
{
start_connection();
}
// Do it _after_ state assignment because set_timer() can actually trigger
// other timers and requests (reenterability is our friend)
if (request_timeout > 0)
{
timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id)
{
stackin();
if (state == HTTP_CO_REQUEST_SENT)
{
// In case of high CPU load, we may not handle etcd responses in time
// For this case, first check the socket and only then terminate request with the timeout
submit_read(true);
}
else
{
close_connection();
parsed = { .error = "HTTP request timed out" };
run_cb_and_clear();
}
stackout();
});
}
stackout();
}
@@ -284,19 +271,17 @@ void http_co_t::close_connection()
void http_co_t::start_connection()
{
stackin();
struct sockaddr_storage addr;
struct sockaddr addr;
if (!string_to_addr(host.c_str(), 1, 80, &addr))
{
close_connection();
parsed = { .error = "Invalid address: "+host };
run_cb_and_clear();
stackout();
return;
}
peer_fd = socket(addr.ss_family, SOCK_STREAM, 0);
peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (peer_fd < 0)
{
close_connection();
parsed = { .error = std::string("socket: ")+strerror(errno) };
run_cb_and_clear();
stackout();
@@ -338,7 +323,7 @@ void http_co_t::handle_events()
epoll_events &= ~EPOLLOUT;
if (epoll_events & EPOLLIN)
{
submit_read(false);
submit_read();
}
else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
{
@@ -425,11 +410,10 @@ again:
stackout();
}
void http_co_t::submit_read(bool check_timeout)
void http_co_t::submit_read()
{
stackin();
int res;
again:
if (rbuf.size() != READ_BUFFER_SIZE)
{
rbuf.resize(READ_BUFFER_SIZE);
@@ -444,22 +428,7 @@ again:
}
if (res == -EAGAIN || res == -EINTR)
{
if (check_timeout)
{
if (res == -EINTR)
goto again;
else
{
// Timeout happened and there is no data to read
close_connection();
parsed = { .error = "HTTP request timed out" };
run_cb_and_clear();
}
}
else
{
epoll_events = epoll_events & ~EPOLLIN;
}
epoll_events = epoll_events & ~EPOLLIN;
}
else if (res <= 0)
{
@@ -532,11 +501,8 @@ bool http_co_t::handle_read()
if (state == HTTP_CO_HEADERS_RECEIVED && target_response_size > 0 && response.size() >= target_response_size)
{
std::swap(parsed.body, response);
if (!keepalive)
close_connection();
else
state = HTTP_CO_KEEPALIVE;
run_cb_and_clear();
response_callback(&parsed);
parsed.eof = true;
}
else if (state == HTTP_CO_CHUNKED && response.size() > 0)
{
@@ -567,14 +533,10 @@ bool http_co_t::handle_read()
response_callback(&parsed);
parsed.body = "";
}
else if (parsed.eof)
if (parsed.eof && !want_streaming)
{
// Normal response
if (!keepalive)
close_connection();
else
state = HTTP_CO_KEEPALIVE;
run_cb_and_clear();
response_callback(&parsed);
}
}
else if (state == HTTP_CO_WEBSOCKET && response.size() > 0)
@@ -585,20 +547,29 @@ bool http_co_t::handle_read()
parsed.body = "";
}
}
if (parsed.eof)
{
response_callback = NULL;
parsed = {};
if (!keepalive)
{
close_connection();
}
else
{
state = HTTP_CO_KEEPALIVE;
if (keepalive_queue.size() > 0)
{
auto next = keepalive_queue[0];
keepalive_queue.erase(keepalive_queue.begin(), keepalive_queue.begin()+1);
next();
}
}
}
stackout();
return true;
}
void http_co_t::next_request()
{
if (keepalive_queue.size() > 0)
{
auto next = keepalive_queue[0];
keepalive_queue.erase(keepalive_queue.begin(), keepalive_queue.begin()+1);
next();
}
}
uint64_t stoull_full(const std::string & str, int base)
{
if (isspace(str[0]))

View File

@@ -222,13 +222,13 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
{
assert(peer_osd != this->osd_num);
struct sockaddr_storage addr;
struct sockaddr addr;
if (!string_to_addr(peer_host, 0, peer_port, &addr))
{
on_connect_peer(peer_osd, -EINVAL);
return;
}
int peer_fd = socket(addr.ss_family, SOCK_STREAM, 0);
int peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (peer_fd < 0)
{
on_connect_peer(peer_osd, -errno);
@@ -484,10 +484,10 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
void osd_messenger_t::accept_connections(int listen_fd)
{
// Accept new connections
sockaddr_storage addr;
sockaddr addr;
socklen_t peer_addr_size = sizeof(addr);
int peer_fd;
while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0)
{
assert(peer_fd != 0);
fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd,

View File

@@ -49,7 +49,7 @@ struct osd_client_t
{
int refs = 0;
sockaddr_storage peer_addr;
sockaddr peer_addr;
int peer_port;
int peer_fd;
int peer_state;

View File

@@ -3,6 +3,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include "msgr_rdma.h"
#include "messenger.h"
@@ -54,6 +55,7 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
{
int res;
bool odp = true;
ibv_device **dev_list = NULL;
msgr_rdma_context_t *ctx = new msgr_rdma_context_t();
ctx->mtu = mtu;
@@ -117,9 +119,9 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev));
goto cleanup;
}
if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid))
if ((res = ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid)) != 0)
{
fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index);
fprintf(stderr, "Couldn't read RDMA device %s GID index %d: %s\n", ibv_get_device_name(ctx->dev), gid_index, strerror(res));
goto cleanup;
}
@@ -131,9 +133,9 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
}
{
if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx))
if ((res = ibv_query_device_ex(ctx->context, NULL, &ctx->attrx)) != 0)
{
fprintf(stderr, "Couldn't query RDMA device for its features\n");
fprintf(stderr, "Couldn't query RDMA device %s for its features: %s\n", ibv_get_device_name(ctx->dev), strerror(res));
goto cleanup;
}
if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) ||
@@ -141,15 +143,20 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) ||
!(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV))
{
fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n");
goto cleanup;
fprintf(stderr, "Warning: RDMA device isn't implicit ODP (On-Demand Paging) capable, trying to lock all application memory\n");
if (mlockall(MCL_CURRENT|MCL_FUTURE|MCL_ONFAULT) != 0)
{
fprintf(stderr, "mlockall() failed: %s\n", strerror(errno));
goto cleanup;
}
odp = false;
}
}
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | (odp ? IBV_ACCESS_ON_DEMAND : 0));
if (!ctx->mr)
{
fprintf(stderr, "Couldn't register RDMA memory region\n");
fprintf(stderr, "Couldn't register RDMA memory region: %s\n", strerror(errno));
goto cleanup;
}

View File

@@ -363,8 +363,7 @@ public:
setsid();
if (fork())
exit(0);
if (chdir("/") != 0)
fprintf(stderr, "Warning: Failed to chdir into /\n");
chdir("/");
close(0);
close(1);
close(2);
@@ -526,11 +525,7 @@ protected:
{
goto end_unmap;
}
r = write(qd_fd, "32768", 5);
if (r != 5)
{
fprintf(stderr, "Warning: Failed to configure max_sectors_kb\n");
}
write(qd_fd, "32768", 5);
close(qd_fd);
if (!fork())
{

View File

@@ -57,11 +57,7 @@ osd_t::osd_t(const json11::Json & config, ring_loop_t *ringloop)
if (this->config["osd_memlock"] == "true" || this->config["osd_memlock"] == "1" || this->config["osd_memlock"] == "yes")
{
// Lock all OSD memory if requested
if (mlockall(MCL_CURRENT|MCL_FUTURE
#ifdef MCL_ONFAULT
| MCL_ONFAULT
#endif
) != 0)
if (mlockall(MCL_CURRENT|MCL_FUTURE|MCL_ONFAULT) != 0)
{
fprintf(stderr, "osd_memlock is set to true, but mlockall() failed: %s\n", strerror(errno));
exit(-1);
@@ -200,7 +196,46 @@ void osd_t::bind_socket()
// FIXME Support multiple listening sockets
listen_fd = create_and_bind_socket(bind_address, bind_port, listen_backlog, &listening_port);
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (bind_port == 0)
{
socklen_t len = sizeof(addr);
if (getsockname(listen_fd, (sockaddr *)&addr, &len) == -1)
{
close(listen_fd);
throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
}
listening_port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else
{
listening_port = bind_port;
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events)

View File

@@ -211,7 +211,7 @@ class osd_t
// flushing, recovery and backfill
void submit_pg_flush_ops(pg_t & pg);
void handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval);
bool submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data);
void submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data);
bool pick_next_recovery(osd_recovery_op_t &op);
void submit_recovery_op(osd_recovery_op_t *op);
bool continue_recovery();

View File

@@ -457,8 +457,7 @@ void osd_t::renew_lease()
if (err == "" && data["result"]["TTL"].string_value() == "")
{
// Die
fprintf(stderr, "Error refreshing etcd lease\n");
force_stop(1);
throw std::runtime_error("etcd lease has expired");
}
if (err != "")
{
@@ -467,8 +466,7 @@ void osd_t::renew_lease()
if (etcd_failed_attempts > st_cli.max_etcd_attempts)
{
// Die
fprintf(stderr, "Cluster connection failed\n");
force_stop(1);
throw std::runtime_error("Cluster connection failed");
}
// Retry
tfd->set_timer(st_cli.etcd_quick_timeout, false, [this](int timer_id)

View File

@@ -47,8 +47,7 @@ void osd_t::submit_pg_flush_ops(pg_t & pg)
if (l.second.size() > 0)
{
fb->flush_ops++;
if (!submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data()))
return;
submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data());
}
}
for (auto & l: fb->stable_lists)
@@ -56,8 +55,7 @@ void osd_t::submit_pg_flush_ops(pg_t & pg)
if (l.second.size() > 0)
{
fb->flush_ops++;
if (!submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data()))
return;
submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data());
}
}
}
@@ -162,7 +160,7 @@ void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, p
}
}
bool osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data)
void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data)
{
osd_op_t *op = new osd_op_t();
// Copy buffer so it gets freed along with the operation
@@ -190,8 +188,10 @@ bool osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
else
{
// Peer
int peer_fd = msgr.osd_peer_fds[peer_osd];
op->op_type = OSD_OP_OUT;
op->iov.push_back(op->buf, count * sizeof(obj_ver_id));
op->peer_fd = peer_fd;
op->req = (osd_any_op_t){
.sec_stab = {
.header = {
@@ -207,21 +207,8 @@ bool osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t
handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval);
delete op;
};
auto peer_fd_it = msgr.osd_peer_fds.find(peer_osd);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
op->peer_fd = peer_fd_it->second;
msgr.outbox_push(op);
}
else
{
// Fail it immediately
op->reply.hdr.retval = -EPIPE;
op->callback(op);
return false;
}
msgr.outbox_push(op);
}
return true;
}
bool osd_t::pick_next_recovery(osd_recovery_op_t &op)

View File

@@ -29,10 +29,8 @@ void osd_t::handle_peers()
degraded_objects += p.second.degraded_objects.size();
if (p.second.state & PG_HAS_UNCLEAN)
peering_state = peering_state | OSD_FLUSHING_PGS;
else if (p.second.state & (PG_HAS_DEGRADED | PG_HAS_MISPLACED))
else if (p.second.state & PG_HAS_DEGRADED)
peering_state = peering_state | OSD_RECOVERING;
ringloop->wakeup();
return;
}
else
{
@@ -342,7 +340,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p
else
{
// Peer
auto & cl = msgr.clients.at(msgr.osd_peer_fds.at(role_osd));
auto & cl = msgr.clients.at(msgr.osd_peer_fds[role_osd]);
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->peer_fd = cl->peer_fd;
@@ -396,9 +394,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
{
if (op->bs_op->retval < 0)
{
printf("Local OP_LIST failed: retval=%d\n", op->bs_op->retval);
force_stop(1);
return;
throw std::runtime_error("local OP_LIST failed");
}
add_bs_subop_stats(op);
printf(
@@ -423,7 +419,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
// Peer
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->peer_fd = msgr.osd_peer_fds.at(role_osd);
op->peer_fd = msgr.osd_peer_fds[role_osd];
op->req = (osd_any_op_t){
.sec_list = {
.header = {

View File

@@ -246,6 +246,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
// Send to a remote OSD
osd_op_t *subop = op_data->subops+subop_idx;
subop->op_type = OSD_OP_OUT;
subop->peer_fd = msgr.osd_peer_fds.at(subop_osd_num);
// FIXME: Use the pre-allocated buffer
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
subop->req = (osd_any_op_t){
@@ -286,18 +287,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
}
handle_primary_subop(subop, cur_op);
};
auto peer_fd_it = msgr.osd_peer_fds.find(subop_osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subop->peer_fd = peer_fd_it->second;
msgr.outbox_push(subop);
}
else
{
// Fail it immediately
subop->reply.hdr.retval = -EPIPE;
subop->callback(subop);
}
msgr.outbox_push(subop);
subop_idx++;
}
prev = i+1;

View File

@@ -182,6 +182,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
else
{
subop->op_type = OSD_OP_OUT;
subop->peer_fd = msgr.osd_peer_fds.at(role_osd_num);
subop->bitmap = stripes[stripe_num].bmp_buf;
subop->bitmap_len = clean_entry_bitmap_size;
subop->req.sec_rw = {
@@ -224,18 +225,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
{
handle_primary_subop(subop, cur_op);
};
auto peer_fd_it = msgr.osd_peer_fds.find(role_osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subop->peer_fd = peer_fd_it->second;
msgr.outbox_push(subop);
}
else
{
// Fail it immediately
subop->reply.hdr.retval = -EPIPE;
subop->callback(subop);
}
msgr.outbox_push(subop);
}
i++;
}
@@ -473,6 +463,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
else
{
subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = msgr.osd_peer_fds.at(chunk.osd_num);
subops[i].req = (osd_any_op_t){ .sec_del = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
@@ -486,18 +477,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
{
handle_primary_subop(subop, cur_op);
};
auto peer_fd_it = msgr.osd_peer_fds.find(chunk.osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subops[i].peer_fd = peer_fd_it->second;
msgr.outbox_push(&subops[i]);
}
else
{
// Fail it immediately
subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]);
}
msgr.outbox_push(&subops[i]);
}
}
}
@@ -587,6 +567,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
else
{
subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = msgr.osd_peer_fds.at(stab_osd.osd_num);
subops[i].req = (osd_any_op_t){ .sec_stab = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
@@ -600,18 +581,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
{
handle_primary_subop(subop, cur_op);
};
auto peer_fd_it = msgr.osd_peer_fds.find(stab_osd.osd_num);
if (peer_fd_it != msgr.osd_peer_fds.end())
{
subops[i].peer_fd = peer_fd_it->second;
msgr.outbox_push(&subops[i]);
}
else
{
// Fail it immediately
subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]);
}
msgr.outbox_push(&subops[i]);
}
}
}

View File

@@ -8,7 +8,7 @@
#include "osd_id.h"
#ifndef MEM_ALIGNMENT
#define MEM_ALIGNMENT 4096
#define MEM_ALIGNMENT 512
#endif
struct buf_len_t

View File

@@ -134,14 +134,14 @@ int main(int narg, char *args[])
int connect_osd(const char *osd_address, int osd_port)
{
struct sockaddr_storage addr;
struct sockaddr addr;
if (!string_to_addr(osd_address, 0, osd_port, &addr))
{
fprintf(stderr, "server address: %s is not valid\n", osd_address);
return -1;
}
int connect_fd = socket(addr.ss_family, SOCK_STREAM, 0);
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (connect_fd < 0)
{
perror("socket");

View File

@@ -67,14 +67,14 @@ int main(int narg, char *args[])
int connect_stub(const char *server_address, int server_port)
{
struct sockaddr_storage addr;
struct sockaddr addr;
if (!string_to_addr(server_address, 0, server_port, &addr))
{
fprintf(stderr, "server address: %s is not valid\n", server_address);
return -1;
}
int connect_fd = socket(addr.ss_family, SOCK_STREAM, 0);
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (connect_fd < 0)
{
perror("socket");

View File

@@ -41,19 +41,21 @@
#include "rw_blocking.h"
#include "osd_ops.h"
int bind_stub(std::string bind_address, int bind_port);
void run_stub(int peer_fd);
int main(int narg, char *args[])
{
int listen_fd = create_and_bind_socket("0.0.0.0", 11203, 128, NULL);
int listen_fd = bind_stub("0.0.0.0", 11203);
// Accept new connections
sockaddr_storage addr;
sockaddr addr;
socklen_t peer_addr_size = sizeof(addr);
int peer_fd;
while (1)
{
printf("stub_osd: waiting for 1 client\n");
peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size);
peer_fd = accept(listen_fd, &addr, &peer_addr_size);
if (peer_fd == -1)
{
if (errno == EAGAIN)
@@ -74,6 +76,39 @@ int main(int narg, char *args[])
return 0;
}
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
return listen_fd;
}
void run_stub(int peer_fd)
{
osd_any_op_t op;

View File

@@ -25,6 +25,8 @@
#include "epoll_manager.h"
#include "messenger.h"
int bind_stub(std::string bind_address, int bind_port);
void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op);
int main(int narg, char *args[])
@@ -41,8 +43,7 @@ int main(int narg, char *args[])
json11::Json config = json11::Json::object { { "log_level", 1 } };
msgr->parse_config(config);
// Accept new connections
int listen_fd = create_and_bind_socket("0.0.0.0", 11203, 128, NULL);
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
int listen_fd = bind_stub("0.0.0.0", 11203);
epmgr->set_fd_handler(listen_fd, false, [listen_fd, msgr](int fd, int events)
{
msgr->accept_connections(listen_fd);
@@ -66,6 +67,41 @@ int main(int narg, char *args[])
return 0;
}
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
}
if (listen(listen_fd, listen_backlog) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("listen: ") + strerror(errno));
}
fcntl(listen_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
return listen_fd;
}
void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op)
{
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;

View File

@@ -6,7 +6,7 @@ includedir=${prefix}/@CMAKE_INSTALL_INCLUDEDIR@
Name: Vitastor
Description: Vitastor client library
Version: 0.6.15
Version: 0.6.12
Libs: -L${libdir} -lvitastor_client
Cflags: -I${includedir}