Compare commits
15 Commits
v1.3.1
...
recovery-a
Author | SHA1 | Date | |
---|---|---|---|
d4ebbeaf5c | |||
bf0c29a46c | |||
aca2bef15f | |||
4dd6e89263 | |||
9bac99ffb6 | |||
62ed130960 | |||
9c7755b6e8 | |||
691ebd991a | |||
6d5df908a3 | |||
fa87769ed8 | |||
2ce8292803 | |||
7f8f7ded52 | |||
68553eabbb | |||
3147c5c8d5 | |||
576e2ae608 |
@@ -35,9 +35,9 @@ RUN (echo deb http://vitastor.io/debian bookworm main > /etc/apt/sources.list.d/
|
||||
wget -q -O /etc/apt/trusted.gpg.d/vitastor.gpg https://vitastor.io/debian/pubkey.gpg && \
|
||||
apt-get update && \
|
||||
apt-get install -y vitastor-client && \
|
||||
apt-get download qemu-system-common && \
|
||||
apt-get download qemu-block-extra && \
|
||||
dpkg -x qemu-system-common*.deb tmp1 && \
|
||||
wget https://vitastor.io/archive/qemu/qemu-bookworm-8.1.2%2Bds-1%2Bvitastor1/qemu-utils_8.1.2%2Bds-1%2Bvitastor1_amd64.deb && \
|
||||
wget https://vitastor.io/archive/qemu/qemu-bookworm-8.1.2%2Bds-1%2Bvitastor1/qemu-block-extra_8.1.2%2Bds-1%2Bvitastor1_amd64.deb && \
|
||||
dpkg -x qemu-utils*.deb tmp1 && \
|
||||
dpkg -x qemu-block-extra*.deb tmp1 && \
|
||||
cp -a tmp1/usr/bin/qemu-storage-daemon /usr/bin/ && \
|
||||
mkdir -p /usr/lib/x86_64-linux-gnu/qemu && \
|
||||
|
@@ -97,6 +97,15 @@ func GetConnectionParams(params map[string]string) (map[string]string, error)
|
||||
}
|
||||
case []string:
|
||||
etcdUrl = config["etcd_address"].([]string)
|
||||
case []interface{}:
|
||||
for _, url := range config["etcd_address"].([]interface{})
|
||||
{
|
||||
s, ok := url.(string)
|
||||
if (ok)
|
||||
{
|
||||
etcdUrl = append(etcdUrl, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (len(etcdUrl) == 0)
|
||||
{
|
||||
@@ -105,8 +114,9 @@ func GetConnectionParams(params map[string]string) (map[string]string, error)
|
||||
return ctxVars, nil
|
||||
}
|
||||
|
||||
func system(program string, args ...string) ([]byte, error)
|
||||
func system(program string, args ...string) ([]byte, []byte, error)
|
||||
{
|
||||
klog.Infof("Running "+program+" "+strings.Join(args, " "))
|
||||
c := exec.Command(program, args...)
|
||||
var stdout, stderr bytes.Buffer
|
||||
c.Stdout, c.Stderr = &stdout, &stderr
|
||||
@@ -115,9 +125,9 @@ func system(program string, args ...string) ([]byte, error)
|
||||
{
|
||||
stdoutStr, stderrStr := string(stdout.Bytes()), string(stderr.Bytes())
|
||||
klog.Errorf(program+" "+strings.Join(args, " ")+" failed: %s, status %s\n", stdoutStr+stderrStr, err)
|
||||
return nil, status.Error(codes.Internal, stdoutStr+stderrStr+" (status "+err.Error()+")")
|
||||
return nil, nil, status.Error(codes.Internal, stdoutStr+stderrStr+" (status "+err.Error()+")")
|
||||
}
|
||||
return stdout.Bytes(), nil
|
||||
return stdout.Bytes(), stderr.Bytes(), nil
|
||||
}
|
||||
|
||||
func invokeCLI(ctxVars map[string]string, args []string) ([]byte, error)
|
||||
@@ -126,7 +136,8 @@ func invokeCLI(ctxVars map[string]string, args []string) ([]byte, error)
|
||||
{
|
||||
args = append(args, "--config_path", ctxVars["configPath"])
|
||||
}
|
||||
return system("/usr/bin/vitastor-cli", args...)
|
||||
stdout, _, err := system("/usr/bin/vitastor-cli", args...)
|
||||
return stdout, err
|
||||
}
|
||||
|
||||
// Create the volume
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
@@ -154,8 +155,13 @@ func (ns *NodeServer) mapNbd(volName string, ctxVars map[string]string, readonly
|
||||
{
|
||||
args = append(args, "--readonly", "1")
|
||||
}
|
||||
dev, err := system("/usr/bin/vitastor-nbd", args...)
|
||||
return strings.TrimSpace(string(dev)), err
|
||||
stdout, stderr, err := system("/usr/bin/vitastor-nbd", args...)
|
||||
dev := strings.TrimSpace(string(stdout))
|
||||
if (dev == "")
|
||||
{
|
||||
return "", fmt.Errorf("vitastor-nbd did not return the name of NBD device. output: %s", stderr)
|
||||
}
|
||||
return dev, err
|
||||
}
|
||||
|
||||
func (ns *NodeServer) unmapNbd(devicePath string)
|
||||
@@ -170,6 +176,7 @@ func (ns *NodeServer) unmapNbd(devicePath string)
|
||||
|
||||
func findByPidFile(pidFile string) (*os.Process, error)
|
||||
{
|
||||
klog.Infof("killing process with PID from file %s", pidFile)
|
||||
pidBuf, err := os.ReadFile(pidFile)
|
||||
if (err != nil)
|
||||
{
|
||||
@@ -221,7 +228,7 @@ func startStorageDaemon(vdpaId, volName, pidFile, configPath string, readonly bo
|
||||
{
|
||||
writable = "false"
|
||||
}
|
||||
_, err := system(
|
||||
_, _, err := system(
|
||||
"/usr/bin/qemu-storage-daemon", "--daemonize", "--pidfile", pidFile, "--blockdev", string(blockSpecJson),
|
||||
"--export", "vduse-blk,id="+vdpaId+",node-name=disk1,name="+vdpaId+",num-queues=16,queue-size=128,writable="+writable,
|
||||
)
|
||||
@@ -234,7 +241,7 @@ func (ns *NodeServer) mapVduse(volName string, ctxVars map[string]string, readon
|
||||
stateFd, err := os.CreateTemp(ns.stateDir, "vitastor-vduse-*.json")
|
||||
if (err != nil)
|
||||
{
|
||||
return "", "", status.Error(codes.Internal, err.Error())
|
||||
return "", "", err
|
||||
}
|
||||
stateFile := stateFd.Name()
|
||||
stateFd.Close()
|
||||
@@ -246,11 +253,12 @@ func (ns *NodeServer) mapVduse(volName string, ctxVars map[string]string, readon
|
||||
if (err == nil)
|
||||
{
|
||||
// Add device to VDPA bus
|
||||
_, err = system("/sbin/vdpa", "-j", "dev", "add", "name", vdpaId, "mgmtdev", "vduse")
|
||||
_, _, err = system("/sbin/vdpa", "-j", "dev", "add", "name", vdpaId, "mgmtdev", "vduse")
|
||||
if (err == nil)
|
||||
{
|
||||
// Find block device name
|
||||
matches, err := filepath.Glob("/sys/bus/vdpa/devices/"+vdpaId+"/virtio*/block/*")
|
||||
var matches []string
|
||||
matches, err = filepath.Glob("/sys/bus/vdpa/devices/"+vdpaId+"/virtio*/block/*")
|
||||
if (err == nil && len(matches) == 0)
|
||||
{
|
||||
err = errors.New("/sys/bus/vdpa/devices/"+vdpaId+"/virtio*/block/* is not found")
|
||||
@@ -277,21 +285,14 @@ func (ns *NodeServer) mapVduse(volName string, ctxVars map[string]string, readon
|
||||
}
|
||||
}
|
||||
}
|
||||
if (err != nil)
|
||||
{
|
||||
err = status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
if (err != nil)
|
||||
killErr := killByPidFile(pidFile)
|
||||
if (killErr != nil)
|
||||
{
|
||||
killErr := killByPidFile(pidFile)
|
||||
if (killErr != nil)
|
||||
{
|
||||
klog.Errorf("Failed to kill started qemu-storage-daemon: %v", killErr)
|
||||
}
|
||||
os.Remove(stateFile)
|
||||
os.Remove(pidFile)
|
||||
klog.Errorf("Failed to kill started qemu-storage-daemon: %v", killErr)
|
||||
}
|
||||
os.Remove(stateFile)
|
||||
os.Remove(pidFile)
|
||||
}
|
||||
return "", "", err
|
||||
}
|
||||
@@ -337,7 +338,7 @@ func (ns *NodeServer) unmapVduseById(vdpaId string)
|
||||
}
|
||||
else
|
||||
{
|
||||
_, _ = system("/sbin/vdpa", "-j", "dev", "del", vdpaId)
|
||||
_, _, _ = system("/sbin/vdpa", "-j", "dev", "del", vdpaId)
|
||||
}
|
||||
stateFile := ns.stateDir + vdpaId + ".json"
|
||||
os.Remove(stateFile)
|
||||
@@ -377,7 +378,7 @@ func (ns *NodeServer) restoreVduseDaemons()
|
||||
}
|
||||
devList := make(map[string]interface{})
|
||||
// example output: {"dev":{"test1":{"type":"block","mgmtdev":"vduse","vendor_id":0,"max_vqs":16,"max_vq_size":128}}}
|
||||
devListJSON, err := system("/sbin/vdpa", "-j", "dev", "list")
|
||||
devListJSON, _, err := system("/sbin/vdpa", "-j", "dev", "list")
|
||||
if (err != nil)
|
||||
{
|
||||
return
|
||||
@@ -456,13 +457,13 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to create block device mount target %s with error: %v", targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
err = pathFile.Close()
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to close %s with error: %v", targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
else
|
||||
@@ -471,13 +472,13 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to create fs mount target %s with error: %v", targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -597,7 +598,7 @@ unmap:
|
||||
{
|
||||
ns.unmapVduseById(vdpaId)
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// NodeUnpublishVolume unmounts the volume from the target path
|
||||
@@ -612,7 +613,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
{
|
||||
return nil, status.Error(codes.NotFound, "Target path not found")
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
if (devicePath == "")
|
||||
{
|
||||
@@ -625,7 +626,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
err = mount.CleanupMountPoint(targetPath, ns.mounter, false)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
// unmap NBD device
|
||||
if (refCount == 1)
|
||||
|
4
debian/patched-qemu.Dockerfile
vendored
4
debian/patched-qemu.Dockerfile
vendored
@@ -7,7 +7,7 @@ ARG REL=
|
||||
|
||||
WORKDIR /root
|
||||
|
||||
RUN if [ "$REL" = "buster" -o "$REL" = "bullseye" ]; then \
|
||||
RUN if [ "$REL" = "buster" -o "$REL" = "bullseye" -o "$REL" = "bookworm" ]; then \
|
||||
echo "deb http://deb.debian.org/debian $REL-backports main" >> /etc/apt/sources.list; \
|
||||
echo >> /etc/apt/preferences; \
|
||||
echo 'Package: *' >> /etc/apt/preferences; \
|
||||
@@ -45,7 +45,7 @@ RUN set -e; \
|
||||
rm -rf /root/packages/qemu-$REL/*; \
|
||||
cd /root/packages/qemu-$REL; \
|
||||
dpkg-source -x /root/qemu*.dsc; \
|
||||
QEMU_VER=$(ls -d qemu*/ | perl -pe 's!^.*(\d+\.\d+).*!$1!'); \
|
||||
QEMU_VER=$(ls -d qemu*/ | perl -pe 's!^.*?(\d+\.\d+).*!$1!'); \
|
||||
D=$(ls -d qemu*/); \
|
||||
cp /root/vitastor/patches/qemu-$QEMU_VER-vitastor.patch ./qemu-*/debian/patches; \
|
||||
echo qemu-$QEMU_VER-vitastor.patch >> $D/debian/patches/series; \
|
||||
|
@@ -18,7 +18,7 @@
|
||||
stable version from 0.9.x branch instead of 1.x
|
||||
- For Debian 10 (Buster) also enable backports repository:
|
||||
`deb http://deb.debian.org/debian buster-backports main`
|
||||
- Install packages: `apt update; apt install vitastor lp-solve etcd linux-image-amd64 qemu`
|
||||
- Install packages: `apt update; apt install vitastor lp-solve etcd linux-image-amd64 qemu-system-x86`
|
||||
|
||||
## CentOS
|
||||
|
||||
|
@@ -18,7 +18,7 @@
|
||||
установить последнюю стабильную версию из ветки 0.9.x вместо 1.x
|
||||
- Для Debian 10 (Buster) также включите репозиторий backports:
|
||||
`deb http://deb.debian.org/debian buster-backports main`
|
||||
- Установите пакеты: `apt update; apt install vitastor lp-solve etcd linux-image-amd64 qemu`
|
||||
- Установите пакеты: `apt update; apt install vitastor lp-solve etcd linux-image-amd64 qemu-system-x86`
|
||||
|
||||
## CentOS
|
||||
|
||||
|
@@ -17,6 +17,7 @@ It supports the following commands:
|
||||
- [purge](#purge)
|
||||
- [read-sb](#read-sb)
|
||||
- [write-sb](#write-sb)
|
||||
- [update-sb](#update-sb)
|
||||
- [udev](#udev)
|
||||
- [exec-osd](#exec-osd)
|
||||
- [pre-exec](#pre-exec)
|
||||
@@ -182,6 +183,14 @@ Try to read Vitastor OSD superblock from `<device>` and print it in JSON format.
|
||||
|
||||
Read JSON from STDIN and write it into Vitastor OSD superblock on `<device>`.
|
||||
|
||||
## update-sb
|
||||
|
||||
`vitastor-disk update-sb <device> [--force] [--<parameter> <value>] [...]`
|
||||
|
||||
Read Vitastor OSD superblock from <device>, update parameters in it and write it back.
|
||||
|
||||
`--force` allows to ignore validation errors.
|
||||
|
||||
## udev
|
||||
|
||||
`vitastor-disk udev <device>`
|
||||
|
@@ -17,6 +17,7 @@ vitastor-disk - инструмент командной строки для уп
|
||||
- [purge](#purge)
|
||||
- [read-sb](#read-sb)
|
||||
- [write-sb](#write-sb)
|
||||
- [update-sb](#update-sb)
|
||||
- [udev](#udev)
|
||||
- [exec-osd](#exec-osd)
|
||||
- [pre-exec](#pre-exec)
|
||||
@@ -187,6 +188,15 @@ throttle_target_mbs, throttle_target_parallelism, throttle_threshold_us.
|
||||
|
||||
Прочитать JSON со стандартного ввода и записать его в суперблок OSD на диск `<device>`.
|
||||
|
||||
## update-sb
|
||||
|
||||
`vitastor-disk update-sb <device> [--force] [--<параметр> <значение>] [...]`
|
||||
|
||||
Прочитать суперблок OSD с диска `<device>`, изменить в нём заданные параметры и записать обратно.
|
||||
|
||||
Опция `--force` позволяет читать суперблок, даже если он считается некорректным
|
||||
из-за ошибок валидации.
|
||||
|
||||
## udev
|
||||
|
||||
`vitastor-disk udev <device>`
|
||||
|
@@ -146,7 +146,7 @@ Example performance comparison:
|
||||
| 4k random read Q1 | 9600 iops | 7640 iops | 7780 iops |
|
||||
|
||||
To try VDUSE you need at least Linux 5.15, built with VDUSE support
|
||||
(CONFIG_VIRTIO_VDPA=m, CONFIG_VDPA_USER=m, CONFIG_VIRTIO_VDPA=m).
|
||||
(CONFIG_VDPA=m, CONFIG_VDPA_USER=m, CONFIG_VIRTIO_VDPA=m).
|
||||
|
||||
Debian Linux kernels have these options disabled by now, so if you want to try it on Debian,
|
||||
use a kernel from Ubuntu [kernel-ppa/mainline](https://kernel.ubuntu.com/~kernel-ppa/mainline/), Proxmox,
|
||||
|
@@ -149,7 +149,7 @@ VDUSE - на данный момент лучший интерфейс для п
|
||||
| 4k случайное чтение Q1 | 9600 iops | 7640 iops | 7780 iops |
|
||||
|
||||
Чтобы попробовать VDUSE, вам нужно ядро Linux как минимум версии 5.15, собранное с поддержкой
|
||||
VDUSE (CONFIG_VIRTIO_VDPA=m, CONFIG_VDPA_USER=m, CONFIG_VIRTIO_VDPA=m).
|
||||
VDUSE (CONFIG_VDPA=m, CONFIG_VDPA_USER=m, CONFIG_VIRTIO_VDPA=m).
|
||||
|
||||
В ядрах в Debian Linux поддержка пока отключена по умолчанию, так что чтобы попробовать VDUSE
|
||||
на Debian, поставьте ядро из Ubuntu [kernel-ppa/mainline](https://kernel.ubuntu.com/~kernel-ppa/mainline/),
|
||||
|
10
mon/mon.js
10
mon/mon.js
@@ -110,7 +110,15 @@ const etcd_tree = {
|
||||
autosync_interval: 5,
|
||||
autosync_writes: 128,
|
||||
client_queue_depth: 128, // unused
|
||||
recovery_queue_depth: 4,
|
||||
recovery_queue_depth: 1,
|
||||
recovery_sleep_us: 0,
|
||||
recovery_tune_min_util: 0.1,
|
||||
recovery_tune_min_client_util: 0,
|
||||
recovery_tune_max_util: 1.0,
|
||||
recovery_tune_max_client_util: 0.5,
|
||||
recovery_tune_interval: 1,
|
||||
recovery_tune_ewma_rate: 0.5,
|
||||
recovery_tune_sleep_min_us: 10, // 10 microseconds
|
||||
recovery_pg_switch: 128,
|
||||
recovery_sync_batch: 16,
|
||||
no_recovery: false,
|
||||
|
@@ -732,8 +732,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
||||
resume:
|
||||
while (pos < bs->journal.block_size)
|
||||
{
|
||||
journal_entry *je = (journal_entry*)((uint8_t*)buf + proc_pos - done_pos + pos);
|
||||
if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
|
||||
auto buf_pos = proc_pos - done_pos + pos;
|
||||
journal_entry *je = (journal_entry*)((uint8_t*)buf + buf_pos);
|
||||
if (je->magic != JOURNAL_MAGIC || buf_pos+je->size > len || je_crc32(je) != je->crc32 ||
|
||||
je->type < JE_MIN || je->type > JE_MAX || started && je->crc32_prev != crc32_last)
|
||||
{
|
||||
if (pos == 0)
|
||||
|
@@ -127,6 +127,10 @@ static const char *help_text =
|
||||
"vitastor-disk write-sb <device>\n"
|
||||
" Read JSON from STDIN and write it into Vitastor OSD superblock on <device>.\n"
|
||||
"\n"
|
||||
"vitastor-disk update-sb <device> [--force] [--<parameter> <value>] [...]\n"
|
||||
" Read Vitastor OSD superblock from <device>, update parameters in it and write it back.\n"
|
||||
" --force allows to ignore validation errors.\n"
|
||||
"\n"
|
||||
"vitastor-disk udev <device>\n"
|
||||
" Try to read Vitastor OSD superblock from <device> and print variables for udev.\n"
|
||||
"\n"
|
||||
@@ -363,6 +367,15 @@ int main(int argc, char *argv[])
|
||||
}
|
||||
return self.write_sb(cmd[1]);
|
||||
}
|
||||
else if (!strcmp(cmd[0], "update-sb"))
|
||||
{
|
||||
if (cmd.size() != 2)
|
||||
{
|
||||
fprintf(stderr, "Exactly 1 device path argument is required\n");
|
||||
return 1;
|
||||
}
|
||||
return self.update_sb(cmd[1]);
|
||||
}
|
||||
else if (!strcmp(cmd[0], "start") || !strcmp(cmd[0], "stop") ||
|
||||
!strcmp(cmd[0], "restart") || !strcmp(cmd[0], "enable") || !strcmp(cmd[0], "disable"))
|
||||
{
|
||||
|
@@ -109,6 +109,7 @@ struct disk_tool_t
|
||||
int udev_import(std::string device);
|
||||
int read_sb(std::string device);
|
||||
int write_sb(std::string device);
|
||||
int update_sb(std::string device);
|
||||
int exec_osd(std::string device);
|
||||
int systemd_start_stop_osds(const std::vector<std::string> & cmd, const std::vector<std::string> & devices);
|
||||
int pre_exec_osd(std::string device);
|
||||
|
@@ -86,6 +86,24 @@ int disk_tool_t::write_sb(std::string device)
|
||||
return !write_osd_superblock(device, params);
|
||||
}
|
||||
|
||||
int disk_tool_t::update_sb(std::string device)
|
||||
{
|
||||
json11::Json sb = read_osd_superblock(device, true, options.find("force") != options.end());
|
||||
if (sb.is_null())
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
auto sb_obj = sb["params"].object_items();
|
||||
for (auto & kv: options)
|
||||
{
|
||||
if (kv.first != "force")
|
||||
{
|
||||
sb_obj[kv.first] = kv.second;
|
||||
}
|
||||
}
|
||||
return !write_osd_superblock(device, sb_obj);
|
||||
}
|
||||
|
||||
uint32_t disk_tool_t::write_osd_superblock(std::string device, json11::Json params)
|
||||
{
|
||||
std::string json_data = params.dump();
|
||||
|
@@ -135,8 +135,8 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t
|
||||
{
|
||||
if (this->log_level > 0)
|
||||
{
|
||||
printf(
|
||||
"Warning: etcd request failed: %s, retrying %d more times\n",
|
||||
fprintf(
|
||||
stderr, "Warning: etcd request failed: %s, retrying %d more times\n",
|
||||
err.c_str(), retries
|
||||
);
|
||||
}
|
||||
@@ -333,7 +333,7 @@ void etcd_state_client_t::start_etcd_watcher()
|
||||
etcd_watch_ws = NULL;
|
||||
}
|
||||
if (this->log_level > 1)
|
||||
printf("Trying to connect to etcd websocket at %s\n", etcd_address.c_str());
|
||||
fprintf(stderr, "Trying to connect to etcd websocket at %s\n", etcd_address.c_str());
|
||||
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", etcd_slow_timeout,
|
||||
[this, cur_addr = selected_etcd_address](const http_response_t *msg)
|
||||
{
|
||||
|
66
src/osd.cpp
66
src/osd.cpp
@@ -68,14 +68,21 @@ osd_t::osd_t(const json11::Json & config, ring_loop_t *ringloop)
|
||||
}
|
||||
}
|
||||
|
||||
print_stats_timer_id = this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
|
||||
if (print_stats_timer_id == -1)
|
||||
{
|
||||
print_stats();
|
||||
});
|
||||
slow_log_timer_id = this->tfd->set_timer(slow_log_interval*1000, true, [this](int timer_id)
|
||||
print_stats_timer_id = this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
|
||||
{
|
||||
print_stats();
|
||||
});
|
||||
}
|
||||
if (slow_log_timer_id == -1)
|
||||
{
|
||||
print_slow();
|
||||
});
|
||||
slow_log_timer_id = this->tfd->set_timer(slow_log_interval*1000, true, [this](int timer_id)
|
||||
{
|
||||
print_slow();
|
||||
});
|
||||
}
|
||||
apply_recovery_tune_interval();
|
||||
|
||||
msgr.tfd = this->tfd;
|
||||
msgr.ringloop = this->ringloop;
|
||||
@@ -97,6 +104,11 @@ osd_t::~osd_t()
|
||||
tfd->clear_timer(slow_log_timer_id);
|
||||
slow_log_timer_id = -1;
|
||||
}
|
||||
if (rtune_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(rtune_timer_id);
|
||||
rtune_timer_id = -1;
|
||||
}
|
||||
if (print_stats_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(print_stats_timer_id);
|
||||
@@ -196,6 +208,22 @@ void osd_t::parse_config(bool init)
|
||||
recovery_queue_depth = config["recovery_queue_depth"].uint64_value();
|
||||
if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
|
||||
recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
|
||||
recovery_sleep_us = config["recovery_sleep_us"].uint64_value();
|
||||
recovery_tune_min_util = config["recovery_tune_min_util"].is_null()
|
||||
? 0.1 : config["recovery_tune_min_util"].number_value();
|
||||
recovery_tune_max_util = config["recovery_tune_max_util"].is_null()
|
||||
? 1.0 : config["recovery_tune_max_util"].number_value();
|
||||
recovery_tune_min_client_util = config["recovery_tune_min_client_util"].is_null()
|
||||
? 0 : config["recovery_tune_min_client_util"].number_value();
|
||||
recovery_tune_max_client_util = config["recovery_tune_max_client_util"].is_null()
|
||||
? 0.5 : config["recovery_tune_max_client_util"].number_value();
|
||||
auto old_recovery_tune_interval = recovery_tune_interval;
|
||||
recovery_tune_interval = config["recovery_tune_interval"].is_null()
|
||||
? 1 : config["recovery_tune_interval"].uint64_value();
|
||||
recovery_tune_ewma_rate = config["recovery_tune_ewma_rate"].is_null()
|
||||
? 0.5 : config["recovery_tune_ewma_rate"].number_value();
|
||||
recovery_tune_sleep_min_us = config["recovery_tune_sleep_min_us"].is_null()
|
||||
? 10 : config["recovery_tune_sleep_min_us"].uint64_value();
|
||||
recovery_pg_switch = config["recovery_pg_switch"].uint64_value();
|
||||
if (recovery_pg_switch < 1)
|
||||
recovery_pg_switch = DEFAULT_RECOVERY_PG_SWITCH;
|
||||
@@ -274,6 +302,10 @@ void osd_t::parse_config(bool init)
|
||||
print_slow();
|
||||
});
|
||||
}
|
||||
if (old_recovery_tune_interval != recovery_tune_interval)
|
||||
{
|
||||
apply_recovery_tune_interval();
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::bind_socket()
|
||||
@@ -421,14 +453,6 @@ void osd_t::exec_op(osd_op_t *cur_op)
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::reset_stats()
|
||||
{
|
||||
msgr.stats = {};
|
||||
prev_stats = {};
|
||||
memset(recovery_stat_count, 0, sizeof(recovery_stat_count));
|
||||
memset(recovery_stat_bytes, 0, sizeof(recovery_stat_bytes));
|
||||
}
|
||||
|
||||
void osd_t::print_stats()
|
||||
{
|
||||
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
|
||||
@@ -466,19 +490,19 @@ void osd_t::print_stats()
|
||||
}
|
||||
for (int i = 0; i < 2; i++)
|
||||
{
|
||||
if (recovery_stat_count[0][i] != recovery_stat_count[1][i])
|
||||
if (recovery_stat[i].count > recovery_print_prev[i].count)
|
||||
{
|
||||
uint64_t bw = (recovery_stat_bytes[0][i] - recovery_stat_bytes[1][i]) / print_stats_interval;
|
||||
uint64_t bw = (recovery_stat[i].bytes - recovery_print_prev[i].bytes) / print_stats_interval;
|
||||
printf(
|
||||
"[OSD %lu] %s recovery: %.1f op/s, B/W: %.2f %s\n", osd_num, recovery_stat_names[i],
|
||||
(recovery_stat_count[0][i] - recovery_stat_count[1][i]) * 1.0 / print_stats_interval,
|
||||
"[OSD %lu] %s recovery: %.1f op/s, B/W: %.2f %s, avg lat %ld us\n", osd_num, recovery_stat_names[i],
|
||||
(recovery_stat[i].count - recovery_print_prev[i].count) * 1.0 / print_stats_interval,
|
||||
(bw > 1024*1024*1024 ? bw/1024.0/1024/1024 : (bw > 1024*1024 ? bw/1024.0/1024 : bw/1024.0)),
|
||||
(bw > 1024*1024*1024 ? "GB/s" : (bw > 1024*1024 ? "MB/s" : "KB/s"))
|
||||
(bw > 1024*1024*1024 ? "GB/s" : (bw > 1024*1024 ? "MB/s" : "KB/s")),
|
||||
(recovery_stat[i].usec - recovery_print_prev[i].usec) / (recovery_stat[i].count - recovery_print_prev[i].count)
|
||||
);
|
||||
recovery_stat_count[1][i] = recovery_stat_count[0][i];
|
||||
recovery_stat_bytes[1][i] = recovery_stat_bytes[0][i];
|
||||
}
|
||||
}
|
||||
memcpy(recovery_print_prev, recovery_stat, sizeof(recovery_stat));
|
||||
if (corrupted_objects > 0)
|
||||
{
|
||||
printf("[OSD %lu] %lu object(s) corrupted\n", osd_num, corrupted_objects);
|
||||
|
34
src/osd.h
34
src/osd.h
@@ -34,7 +34,7 @@
|
||||
#define DEFAULT_AUTOSYNC_INTERVAL 5
|
||||
#define DEFAULT_AUTOSYNC_WRITES 128
|
||||
#define MAX_RECOVERY_QUEUE 2048
|
||||
#define DEFAULT_RECOVERY_QUEUE 4
|
||||
#define DEFAULT_RECOVERY_QUEUE 1
|
||||
#define DEFAULT_RECOVERY_PG_SWITCH 128
|
||||
#define DEFAULT_RECOVERY_BATCH 16
|
||||
|
||||
@@ -87,6 +87,11 @@ struct osd_chain_read_t
|
||||
|
||||
struct osd_rmw_stripe_t;
|
||||
|
||||
struct recovery_stat_t
|
||||
{
|
||||
uint64_t count, usec, bytes;
|
||||
};
|
||||
|
||||
class osd_t
|
||||
{
|
||||
// config
|
||||
@@ -111,7 +116,15 @@ class osd_t
|
||||
int immediate_commit = IMMEDIATE_NONE;
|
||||
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // "emergency" sync every 5 seconds
|
||||
int autosync_writes = DEFAULT_AUTOSYNC_WRITES;
|
||||
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
|
||||
uint64_t recovery_queue_depth = 1;
|
||||
uint64_t recovery_sleep_us = 0;
|
||||
double recovery_tune_min_util = 0.1;
|
||||
double recovery_tune_min_client_util = 0;
|
||||
double recovery_tune_max_util = 1.0;
|
||||
double recovery_tune_max_client_util = 0.5;
|
||||
int recovery_tune_interval = 1;
|
||||
double recovery_tune_ewma_rate = 0.5;
|
||||
int recovery_tune_sleep_min_us = 10;
|
||||
int recovery_pg_switch = DEFAULT_RECOVERY_PG_SWITCH;
|
||||
int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
|
||||
int inode_vanish_time = 60;
|
||||
@@ -189,8 +202,17 @@ class osd_t
|
||||
std::map<uint64_t, inode_stats_t> inode_stats;
|
||||
std::map<uint64_t, timespec> vanishing_inodes;
|
||||
const char* recovery_stat_names[2] = { "degraded", "misplaced" };
|
||||
uint64_t recovery_stat_count[2][2] = {};
|
||||
uint64_t recovery_stat_bytes[2][2] = {};
|
||||
recovery_stat_t recovery_stat[2];
|
||||
recovery_stat_t recovery_print_prev[2];
|
||||
|
||||
// recovery auto-tuning
|
||||
int rtune_timer_id = -1;
|
||||
uint64_t rtune_avg_lat = 0;
|
||||
double rtune_client_util = 0, rtune_target_util = 1;
|
||||
osd_op_stats_t rtune_prev_stats;
|
||||
recovery_stat_t rtune_prev_recovery[2];
|
||||
uint64_t recovery_target_queue_depth = 1;
|
||||
uint64_t recovery_target_sleep_us = 0;
|
||||
|
||||
// cluster connection
|
||||
void parse_config(bool init);
|
||||
@@ -208,8 +230,9 @@ class osd_t
|
||||
void create_osd_state();
|
||||
void renew_lease(bool reload);
|
||||
void print_stats();
|
||||
void tune_recovery();
|
||||
void apply_recovery_tune_interval();
|
||||
void print_slow();
|
||||
void reset_stats();
|
||||
json11::Json get_statistics();
|
||||
void report_statistics();
|
||||
void report_pg_state(pg_t & pg);
|
||||
@@ -238,6 +261,7 @@ class osd_t
|
||||
bool submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data);
|
||||
bool pick_next_recovery(osd_recovery_op_t &op);
|
||||
void submit_recovery_op(osd_recovery_op_t *op);
|
||||
void finish_recovery_op(osd_recovery_op_t *op);
|
||||
bool continue_recovery();
|
||||
pg_osd_set_state_t* change_osd_set(pg_osd_set_state_t *st, pg_t *pg);
|
||||
|
||||
|
@@ -213,12 +213,14 @@ json11::Json osd_t::get_statistics()
|
||||
st["subop_stats"] = subop_stats;
|
||||
st["recovery_stats"] = json11::Json::object {
|
||||
{ recovery_stat_names[0], json11::Json::object {
|
||||
{ "count", recovery_stat_count[0][0] },
|
||||
{ "bytes", recovery_stat_bytes[0][0] },
|
||||
{ "count", recovery_stat[0].count },
|
||||
{ "bytes", recovery_stat[0].bytes },
|
||||
{ "usec", recovery_stat[0].usec },
|
||||
} },
|
||||
{ recovery_stat_names[1], json11::Json::object {
|
||||
{ "count", recovery_stat_count[0][1] },
|
||||
{ "bytes", recovery_stat_bytes[0][1] },
|
||||
{ "count", recovery_stat[1].count },
|
||||
{ "bytes", recovery_stat[1].bytes },
|
||||
{ "usec", recovery_stat[1].usec },
|
||||
} },
|
||||
};
|
||||
return st;
|
||||
|
@@ -325,30 +325,113 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op)
|
||||
{
|
||||
printf("Recovery operation done for %lx:%lx\n", op->oid.inode, op->oid.stripe);
|
||||
}
|
||||
// CAREFUL! op = &recovery_ops[op->oid]. Don't access op->* after recovery_ops.erase()
|
||||
op->osd_op = NULL;
|
||||
recovery_ops.erase(op->oid);
|
||||
delete osd_op;
|
||||
if (immediate_commit != IMMEDIATE_ALL)
|
||||
if (recovery_target_sleep_us)
|
||||
{
|
||||
recovery_done++;
|
||||
if (recovery_done >= recovery_sync_batch)
|
||||
this->tfd->set_timer_us(recovery_target_sleep_us, false, [this, op](int timer_id)
|
||||
{
|
||||
// Force sync every <recovery_sync_batch> operations
|
||||
// This is required not to pile up an excessive amount of delete operations
|
||||
autosync();
|
||||
recovery_done = 0;
|
||||
}
|
||||
finish_recovery_op(op);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
finish_recovery_op(op);
|
||||
}
|
||||
continue_recovery();
|
||||
};
|
||||
exec_op(op->osd_op);
|
||||
}
|
||||
|
||||
void osd_t::apply_recovery_tune_interval()
|
||||
{
|
||||
if (rtune_timer_id >= 0)
|
||||
{
|
||||
tfd->clear_timer(rtune_timer_id);
|
||||
rtune_timer_id = -1;
|
||||
}
|
||||
if (recovery_tune_interval != 0)
|
||||
{
|
||||
rtune_timer_id = this->tfd->set_timer(recovery_tune_interval*1000, true, [this](int timer_id)
|
||||
{
|
||||
tune_recovery();
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
recovery_target_queue_depth = recovery_queue_depth;
|
||||
recovery_target_sleep_us = recovery_sleep_us;
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::finish_recovery_op(osd_recovery_op_t *op)
|
||||
{
|
||||
// CAREFUL! op = &recovery_ops[op->oid]. Don't access op->* after recovery_ops.erase()
|
||||
delete op->osd_op;
|
||||
op->osd_op = NULL;
|
||||
recovery_ops.erase(op->oid);
|
||||
if (immediate_commit != IMMEDIATE_ALL)
|
||||
{
|
||||
recovery_done++;
|
||||
if (recovery_done >= recovery_sync_batch)
|
||||
{
|
||||
// Force sync every <recovery_sync_batch> operations
|
||||
// This is required not to pile up an excessive amount of delete operations
|
||||
autosync();
|
||||
recovery_done = 0;
|
||||
}
|
||||
}
|
||||
continue_recovery();
|
||||
}
|
||||
|
||||
void osd_t::tune_recovery()
|
||||
{
|
||||
static int total_client_ops[] = { OSD_OP_READ, OSD_OP_WRITE, OSD_OP_SYNC, OSD_OP_DELETE };
|
||||
uint64_t total_client_usec = 0;
|
||||
for (int i = 0; i < sizeof(total_client_ops)/sizeof(total_client_ops[0]); i++)
|
||||
{
|
||||
total_client_usec += (msgr.stats.op_stat_sum[total_client_ops[i]] - rtune_prev_stats.op_stat_sum[total_client_ops[i]]);
|
||||
rtune_prev_stats.op_stat_sum[total_client_ops[i]] = msgr.stats.op_stat_sum[total_client_ops[i]];
|
||||
}
|
||||
uint64_t total_recovery_usec = 0, recovery_count = 0;
|
||||
total_recovery_usec += recovery_stat[0].usec-rtune_prev_recovery[0].usec;
|
||||
total_recovery_usec += recovery_stat[1].usec-rtune_prev_recovery[1].usec;
|
||||
recovery_count += recovery_stat[0].count-rtune_prev_recovery[0].count;
|
||||
recovery_count += recovery_stat[1].count-rtune_prev_recovery[1].count;
|
||||
memcpy(rtune_prev_recovery, recovery_stat, sizeof(recovery_stat));
|
||||
if (recovery_count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
rtune_avg_lat = total_recovery_usec/recovery_count*recovery_tune_ewma_rate +
|
||||
rtune_avg_lat*(1-recovery_tune_ewma_rate);
|
||||
// client_util = count/interval * usec/1000000.0/count = usec/1000000.0/interval :-)
|
||||
double client_util = total_client_usec/1000000.0/recovery_tune_interval;
|
||||
rtune_client_util = rtune_client_util*(1-recovery_tune_ewma_rate) + client_util*recovery_tune_ewma_rate;
|
||||
rtune_target_util = (rtune_client_util < recovery_tune_min_client_util
|
||||
? recovery_tune_max_util
|
||||
: recovery_tune_min_util + (rtune_client_util >= recovery_tune_max_client_util
|
||||
? 0 : (recovery_tune_max_util-recovery_tune_min_util)*
|
||||
(recovery_tune_max_client_util-rtune_client_util)/(recovery_tune_max_client_util-recovery_tune_min_client_util)
|
||||
)
|
||||
);
|
||||
recovery_target_queue_depth = (int)rtune_target_util + (rtune_target_util < 1 || rtune_target_util-(int)rtune_target_util >= 0.1 ? 1 : 0);
|
||||
// ideal_iops = 1s / real_latency
|
||||
// ;; target_iops = target_util * ideal_iops
|
||||
// => target_lat = target_queue * 1s / target_iops
|
||||
// => target_lat = target_queue / target_util * real_latency
|
||||
uint64_t target_lat = recovery_target_queue_depth/rtune_target_util * rtune_avg_lat;
|
||||
recovery_target_sleep_us = target_lat > rtune_avg_lat+recovery_tune_sleep_min_us ? target_lat-rtune_avg_lat : 0;
|
||||
if (log_level > 3)
|
||||
{
|
||||
printf(
|
||||
"recovery tune: client util %.2f (ewma %.2f), target util %.2f -> queue %ld, lat %lu us, real %lu us, pause %lu us\n",
|
||||
client_util, rtune_client_util, rtune_target_util, recovery_target_queue_depth, target_lat, rtune_avg_lat, recovery_target_sleep_us
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Just trigger write requests for degraded objects. They'll be recovered during writing
|
||||
bool osd_t::continue_recovery()
|
||||
{
|
||||
while (recovery_ops.size() < recovery_queue_depth)
|
||||
while (recovery_ops.size() < recovery_target_queue_depth)
|
||||
{
|
||||
osd_recovery_op_t op;
|
||||
if (pick_next_recovery(op))
|
||||
|
@@ -3,13 +3,15 @@
|
||||
|
||||
#include "osd_primary.h"
|
||||
|
||||
#define SELF_FD -1
|
||||
|
||||
void osd_t::autosync()
|
||||
{
|
||||
if (immediate_commit != IMMEDIATE_ALL && !autosync_op)
|
||||
{
|
||||
autosync_op = new osd_op_t();
|
||||
autosync_op->op_type = OSD_OP_IN;
|
||||
autosync_op->peer_fd = -1;
|
||||
autosync_op->peer_fd = SELF_FD;
|
||||
autosync_op->req = (osd_any_op_t){
|
||||
.sync = {
|
||||
.header = {
|
||||
@@ -85,9 +87,13 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
|
||||
cur_op->reply.hdr.id = cur_op->req.hdr.id;
|
||||
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
|
||||
cur_op->reply.hdr.retval = retval;
|
||||
if (cur_op->peer_fd == -1)
|
||||
if (cur_op->peer_fd == SELF_FD)
|
||||
{
|
||||
msgr.measure_exec(cur_op);
|
||||
// Do not include internal primary writes (recovery/rebalance) into client op statistics
|
||||
if (cur_op->req.hdr.opcode != OSD_OP_WRITE)
|
||||
{
|
||||
msgr.measure_exec(cur_op);
|
||||
}
|
||||
// Copy lambda to be unaffected by `delete op`
|
||||
std::function<void(osd_op_t*)>(cur_op->callback)(cur_op);
|
||||
}
|
||||
|
@@ -292,16 +292,27 @@ resume_7:
|
||||
{
|
||||
{
|
||||
int recovery_type = op_data->object_state->state & (OBJ_DEGRADED|OBJ_INCOMPLETE) ? 0 : 1;
|
||||
recovery_stat_count[0][recovery_type]++;
|
||||
if (!recovery_stat_count[0][recovery_type])
|
||||
recovery_stat[recovery_type].count++;
|
||||
if (!recovery_stat[recovery_type].count) // wrapped
|
||||
{
|
||||
recovery_stat_count[0][recovery_type]++;
|
||||
recovery_stat_bytes[0][recovery_type] = 0;
|
||||
memset(&recovery_print_prev[recovery_type], 0, sizeof(recovery_print_prev[recovery_type]));
|
||||
memset(&rtune_prev_recovery[recovery_type], 0, sizeof(rtune_prev_recovery[recovery_type]));
|
||||
memset(&recovery_stat[recovery_type], 0, sizeof(recovery_stat[recovery_type]));
|
||||
recovery_stat[recovery_type].count++;
|
||||
}
|
||||
for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); role++)
|
||||
{
|
||||
recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start;
|
||||
recovery_stat[recovery_type].bytes += op_data->stripes[role].write_end - op_data->stripes[role].write_start;
|
||||
}
|
||||
if (!cur_op->tv_end.tv_sec)
|
||||
{
|
||||
clock_gettime(CLOCK_REALTIME, &cur_op->tv_end);
|
||||
}
|
||||
uint64_t usec = (
|
||||
(cur_op->tv_end.tv_sec - cur_op->tv_begin.tv_sec)*1000000 +
|
||||
(cur_op->tv_end.tv_nsec - cur_op->tv_begin.tv_nsec)/1000
|
||||
);
|
||||
recovery_stat[recovery_type].usec += usec;
|
||||
}
|
||||
// Any kind of a non-clean object can have extra chunks, because we don't record objects
|
||||
// as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks
|
||||
|
Reference in New Issue
Block a user