forked from vitalif/vitastor
Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
8810eae8fb | |||
c1365f46c9 | |||
14d6acbcba | |||
1e307069bc | |||
c3e80abad7 | |||
138ffe4032 |
@@ -2,6 +2,6 @@ cmake_minimum_required(VERSION 2.8)
|
||||
|
||||
project(vitastor)
|
||||
|
||||
set(VERSION "0.8.5")
|
||||
set(VERSION "0.8.6")
|
||||
|
||||
add_subdirectory(src)
|
||||
|
@@ -1,4 +1,4 @@
|
||||
VERSION ?= v0.8.5
|
||||
VERSION ?= v0.8.6
|
||||
|
||||
all: build push
|
||||
|
||||
|
@@ -49,7 +49,7 @@ spec:
|
||||
capabilities:
|
||||
add: ["SYS_ADMIN"]
|
||||
allowPrivilegeEscalation: true
|
||||
image: vitalif/vitastor-csi:v0.8.5
|
||||
image: vitalif/vitastor-csi:v0.8.6
|
||||
args:
|
||||
- "--node=$(NODE_ID)"
|
||||
- "--endpoint=$(CSI_ENDPOINT)"
|
||||
|
@@ -116,7 +116,7 @@ spec:
|
||||
privileged: true
|
||||
capabilities:
|
||||
add: ["SYS_ADMIN"]
|
||||
image: vitalif/vitastor-csi:v0.8.5
|
||||
image: vitalif/vitastor-csi:v0.8.6
|
||||
args:
|
||||
- "--node=$(NODE_ID)"
|
||||
- "--endpoint=$(CSI_ENDPOINT)"
|
||||
|
@@ -5,7 +5,7 @@ package vitastor
|
||||
|
||||
const (
|
||||
vitastorCSIDriverName = "csi.vitastor.io"
|
||||
vitastorCSIDriverVersion = "0.8.5"
|
||||
vitastorCSIDriverVersion = "0.8.6"
|
||||
)
|
||||
|
||||
// Config struct fills the parameters of request or user input
|
||||
|
@@ -10,7 +10,6 @@ import (
|
||||
"bytes"
|
||||
"strconv"
|
||||
"time"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"io/ioutil"
|
||||
@@ -21,8 +20,6 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
)
|
||||
|
||||
@@ -114,6 +111,34 @@ func GetConnectionParams(params map[string]string) (map[string]string, []string,
|
||||
return ctxVars, etcdUrl, etcdPrefix
|
||||
}
|
||||
|
||||
func invokeCLI(ctxVars map[string]string, args []string) ([]byte, error)
|
||||
{
|
||||
if (ctxVars["etcdUrl"] != "")
|
||||
{
|
||||
args = append(args, "--etcd_address", ctxVars["etcdUrl"])
|
||||
}
|
||||
if (ctxVars["etcdPrefix"] != "")
|
||||
{
|
||||
args = append(args, "--etcd_prefix", ctxVars["etcdPrefix"])
|
||||
}
|
||||
if (ctxVars["configPath"] != "")
|
||||
{
|
||||
args = append(args, "--config_path", ctxVars["configPath"])
|
||||
}
|
||||
c := exec.Command("/usr/bin/vitastor-cli", args...)
|
||||
var stdout, stderr bytes.Buffer
|
||||
c.Stdout = &stdout
|
||||
c.Stderr = &stderr
|
||||
err := c.Run()
|
||||
stderrStr := string(stderr.Bytes())
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("vitastor-cli %s failed: %s, status %s\n", strings.Join(args, " "), stderrStr, err)
|
||||
return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")")
|
||||
}
|
||||
return stdout.Bytes(), nil
|
||||
}
|
||||
|
||||
// Create the volume
|
||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error)
|
||||
{
|
||||
@@ -146,128 +171,41 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB
|
||||
}
|
||||
|
||||
// FIXME: The following should PROBABLY be implemented externally in a management tool
|
||||
|
||||
ctxVars, etcdUrl, etcdPrefix := GetConnectionParams(req.Parameters)
|
||||
ctxVars, etcdUrl, _ := GetConnectionParams(req.Parameters)
|
||||
if (len(etcdUrl) == 0)
|
||||
{
|
||||
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf")
|
||||
}
|
||||
|
||||
// Connect to etcd
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
DialTimeout: ETCD_TIMEOUT,
|
||||
Endpoints: etcdUrl,
|
||||
})
|
||||
// Create image using vitastor-cli
|
||||
_, err := invokeCLI(ctxVars, []string{ "create", volName, "-s", string(volSize), "--pool", string(poolId) })
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error())
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
var imageId uint64 = 0
|
||||
for
|
||||
{
|
||||
// Check if the image exists
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
||||
resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName)
|
||||
cancel()
|
||||
if (err != nil)
|
||||
if (strings.Index(err.Error(), "already exists") > 0)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
||||
}
|
||||
if (len(resp.Kvs) > 0)
|
||||
{
|
||||
kv := resp.Kvs[0]
|
||||
var v InodeIndex
|
||||
err := json.Unmarshal(kv.Value, &v)
|
||||
stat, err := invokeCLI(ctxVars, []string{ "ls", "--json", volName })
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error())
|
||||
return nil, err
|
||||
}
|
||||
poolId = v.PoolId
|
||||
imageId = v.Id
|
||||
inodeCfgKey := fmt.Sprintf("/config/inode/%d/%d", poolId, imageId)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
||||
resp, err := cli.Get(ctx, etcdPrefix+inodeCfgKey)
|
||||
cancel()
|
||||
var inodeCfg []InodeConfig
|
||||
err = json.Unmarshal(stat, &inodeCfg)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
||||
return nil, status.Error(codes.Internal, "Invalid JSON in vitastor-cli ls: "+err.Error())
|
||||
}
|
||||
if (len(resp.Kvs) == 0)
|
||||
if (len(inodeCfg) == 0)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "missing "+inodeCfgKey+" key in etcd")
|
||||
return nil, status.Error(codes.Internal, "vitastor-cli create said that image already exists, but ls can't find it")
|
||||
}
|
||||
var inodeCfg InodeConfig
|
||||
err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error())
|
||||
}
|
||||
if (inodeCfg.Size < uint64(volSize))
|
||||
if (inodeCfg[0].Size < uint64(volSize))
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "image "+volName+" is already created, but size is less than expected")
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Find a free ID
|
||||
// Create image metadata in a transaction verifying that the image doesn't exist yet AND ID is still free
|
||||
maxIdKey := fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
||||
resp, err := cli.Get(ctx, maxIdKey)
|
||||
cancel()
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
||||
}
|
||||
var modRev int64
|
||||
var nextId uint64
|
||||
if (len(resp.Kvs) > 0)
|
||||
{
|
||||
var err error
|
||||
nextId, err = strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, maxIdKey+" contains invalid ID")
|
||||
}
|
||||
modRev = resp.Kvs[0].ModRevision
|
||||
nextId++
|
||||
}
|
||||
else
|
||||
{
|
||||
nextId = 1
|
||||
}
|
||||
inodeIdxJson, _ := json.Marshal(InodeIndex{
|
||||
Id: nextId,
|
||||
PoolId: poolId,
|
||||
})
|
||||
inodeCfgJson, _ := json.Marshal(InodeConfig{
|
||||
Name: volName,
|
||||
Size: uint64(volSize),
|
||||
})
|
||||
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
||||
txnResp, err := cli.Txn(ctx).If(
|
||||
clientv3.Compare(clientv3.ModRevision(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId)), "=", modRev),
|
||||
clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)), "=", 0),
|
||||
clientv3.Compare(clientv3.CreateRevision(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId)), "=", 0),
|
||||
).Then(
|
||||
clientv3.OpPut(fmt.Sprintf("%s/index/maxid/%d", etcdPrefix, poolId), fmt.Sprintf("%d", nextId)),
|
||||
clientv3.OpPut(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName), string(inodeIdxJson)),
|
||||
clientv3.OpPut(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, poolId, nextId), string(inodeCfgJson)),
|
||||
).Commit()
|
||||
cancel()
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to commit transaction in etcd: "+err.Error())
|
||||
}
|
||||
if (txnResp.Succeeded)
|
||||
{
|
||||
imageId = nextId
|
||||
break
|
||||
}
|
||||
// Start over if the transaction fails
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,97 +237,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
_, etcdUrl, etcdPrefix := GetConnectionParams(ctxVars)
|
||||
if (len(etcdUrl) == 0)
|
||||
{
|
||||
return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf")
|
||||
}
|
||||
ctxVars, _, _ = GetConnectionParams(ctxVars)
|
||||
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
DialTimeout: ETCD_TIMEOUT,
|
||||
Endpoints: etcdUrl,
|
||||
})
|
||||
_, err = invokeCLI(ctxVars, []string{ "rm", volName })
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to connect to etcd at "+strings.Join(etcdUrl, ",")+": "+err.Error())
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
// Find inode by name
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
||||
resp, err := cli.Get(ctx, etcdPrefix+"/index/image/"+volName)
|
||||
cancel()
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
||||
}
|
||||
if (len(resp.Kvs) == 0)
|
||||
{
|
||||
return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist")
|
||||
}
|
||||
var idx InodeIndex
|
||||
err = json.Unmarshal(resp.Kvs[0].Value, &idx)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "invalid /index/image/"+volName+" key in etcd: "+err.Error())
|
||||
}
|
||||
|
||||
// Get inode config
|
||||
inodeCfgKey := fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id)
|
||||
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
||||
resp, err = cli.Get(ctx, inodeCfgKey)
|
||||
cancel()
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to read key from etcd: "+err.Error())
|
||||
}
|
||||
if (len(resp.Kvs) == 0)
|
||||
{
|
||||
return nil, status.Error(codes.NotFound, "volume "+volName+" does not exist")
|
||||
}
|
||||
var inodeCfg InodeConfig
|
||||
err = json.Unmarshal(resp.Kvs[0].Value, &inodeCfg)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error())
|
||||
}
|
||||
|
||||
// Delete inode data by invoking vitastor-cli
|
||||
args := []string{
|
||||
"rm-data", "--etcd_address", strings.Join(etcdUrl, ","),
|
||||
"--pool", fmt.Sprintf("%d", idx.PoolId),
|
||||
"--inode", fmt.Sprintf("%d", idx.Id),
|
||||
}
|
||||
if (ctxVars["configPath"] != "")
|
||||
{
|
||||
args = append(args, "--config_path", ctxVars["configPath"])
|
||||
}
|
||||
c := exec.Command("/usr/bin/vitastor-cli", args...)
|
||||
var stderr bytes.Buffer
|
||||
c.Stdout = nil
|
||||
c.Stderr = &stderr
|
||||
err = c.Run()
|
||||
stderrStr := string(stderr.Bytes())
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("vitastor-cli rm-data failed: %s, status %s\n", stderrStr, err)
|
||||
return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")")
|
||||
}
|
||||
|
||||
// Delete inode config in etcd
|
||||
ctx, cancel = context.WithTimeout(context.Background(), ETCD_TIMEOUT)
|
||||
txnResp, err := cli.Txn(ctx).Then(
|
||||
clientv3.OpDelete(fmt.Sprintf("%s/index/image/%s", etcdPrefix, volName)),
|
||||
clientv3.OpDelete(fmt.Sprintf("%s/config/inode/%d/%d", etcdPrefix, idx.PoolId, idx.Id)),
|
||||
).Commit()
|
||||
cancel()
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to delete keys in etcd: "+err.Error())
|
||||
}
|
||||
if (!txnResp.Succeeded)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "failed to delete keys in etcd: transaction failed")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
|
4
debian/changelog
vendored
4
debian/changelog
vendored
@@ -1,10 +1,10 @@
|
||||
vitastor (0.8.5-1) unstable; urgency=medium
|
||||
vitastor (0.8.6-1) unstable; urgency=medium
|
||||
|
||||
* Bugfixes
|
||||
|
||||
-- Vitaliy Filippov <vitalif@yourcmc.ru> Fri, 03 Jun 2022 02:09:44 +0300
|
||||
|
||||
vitastor (0.8.5-1) unstable; urgency=medium
|
||||
vitastor (0.8.6-1) unstable; urgency=medium
|
||||
|
||||
* Implement NFS proxy
|
||||
* Add documentation
|
||||
|
8
debian/vitastor.Dockerfile
vendored
8
debian/vitastor.Dockerfile
vendored
@@ -34,8 +34,8 @@ RUN set -e -x; \
|
||||
mkdir -p /root/packages/vitastor-$REL; \
|
||||
rm -rf /root/packages/vitastor-$REL/*; \
|
||||
cd /root/packages/vitastor-$REL; \
|
||||
cp -r /root/vitastor vitastor-0.8.5; \
|
||||
cd vitastor-0.8.5; \
|
||||
cp -r /root/vitastor vitastor-0.8.6; \
|
||||
cd vitastor-0.8.6; \
|
||||
ln -s /root/fio-build/fio-*/ ./fio; \
|
||||
FIO=$(head -n1 fio/debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
|
||||
ls /usr/include/linux/raw.h || cp ./debian/raw.h /usr/include/linux/raw.h; \
|
||||
@@ -48,8 +48,8 @@ RUN set -e -x; \
|
||||
rm -rf a b; \
|
||||
echo "dep:fio=$FIO" > debian/fio_version; \
|
||||
cd /root/packages/vitastor-$REL; \
|
||||
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.8.5.orig.tar.xz vitastor-0.8.5; \
|
||||
cd vitastor-0.8.5; \
|
||||
tar --sort=name --mtime='2020-01-01' --owner=0 --group=0 --exclude=debian -cJf vitastor_0.8.6.orig.tar.xz vitastor-0.8.6; \
|
||||
cd vitastor-0.8.6; \
|
||||
V=$(head -n1 debian/changelog | perl -pe 's/^.*\((.*?)\).*$/$1/'); \
|
||||
DEBFULLNAME="Vitaliy Filippov <vitalif@yourcmc.ru>" dch -D $REL -v "$V""$REL" "Rebuild for $REL"; \
|
||||
DEB_BUILD_OPTIONS=nocheck dpkg-buildpackage --jobs=auto -sa; \
|
||||
|
@@ -19,6 +19,7 @@ between clients, OSDs and etcd.
|
||||
- [rdma_max_sge](#rdma_max_sge)
|
||||
- [rdma_max_msg](#rdma_max_msg)
|
||||
- [rdma_max_recv](#rdma_max_recv)
|
||||
- [rdma_max_send](#rdma_max_send)
|
||||
- [peer_connect_interval](#peer_connect_interval)
|
||||
- [peer_connect_timeout](#peer_connect_timeout)
|
||||
- [osd_idle_timeout](#osd_idle_timeout)
|
||||
@@ -74,6 +75,12 @@ to work. For example, Mellanox ConnectX-3 and older adapters don't have
|
||||
Implicit ODP, so they're unsupported by Vitastor. Run `ibv_devinfo -v` as
|
||||
root to list available RDMA devices and their features.
|
||||
|
||||
Remember that you also have to configure your network switches if you use
|
||||
RoCE/RoCEv2, otherwise you may experience unstable performance. Refer to
|
||||
the manual of your network vendor for details about setting up the switch
|
||||
for RoCEv2 correctly. Usually it means setting up Lossless Ethernet with
|
||||
PFC (Priority Flow Control) and ECN (Explicit Congestion Notification).
|
||||
|
||||
## rdma_port_num
|
||||
|
||||
- Type: integer
|
||||
@@ -116,20 +123,30 @@ required to change this parameter.
|
||||
## rdma_max_msg
|
||||
|
||||
- Type: integer
|
||||
- Default: 1048576
|
||||
- Default: 132096
|
||||
|
||||
Maximum size of a single RDMA send or receive operation in bytes.
|
||||
|
||||
## rdma_max_recv
|
||||
|
||||
- Type: integer
|
||||
- Default: 16
|
||||
|
||||
Maximum number of RDMA receive buffers per connection (RDMA requires
|
||||
preallocated buffers to receive data). Each buffer is `rdma_max_msg` bytes
|
||||
in size. So this setting directly affects memory usage: a single Vitastor
|
||||
RDMA client uses `rdma_max_recv * rdma_max_msg * OSD_COUNT` bytes of memory.
|
||||
Default is roughly 2 MB * number of OSDs.
|
||||
|
||||
## rdma_max_send
|
||||
|
||||
- Type: integer
|
||||
- Default: 8
|
||||
|
||||
Maximum number of parallel RDMA receive operations. Note that this number
|
||||
of receive buffers `rdma_max_msg` in size are allocated for each client,
|
||||
so this setting actually affects memory usage. This is because RDMA receive
|
||||
operations are (sadly) still not zero-copy in Vitastor. It may be fixed in
|
||||
later versions.
|
||||
Maximum number of outstanding RDMA send operations per connection. Should be
|
||||
less than `rdma_max_recv` so the receiving side doesn't run out of buffers.
|
||||
Doesn't affect memory usage - additional memory isn't allocated for send
|
||||
operations.
|
||||
|
||||
## peer_connect_interval
|
||||
|
||||
|
@@ -19,6 +19,7 @@
|
||||
- [rdma_max_sge](#rdma_max_sge)
|
||||
- [rdma_max_msg](#rdma_max_msg)
|
||||
- [rdma_max_recv](#rdma_max_recv)
|
||||
- [rdma_max_send](#rdma_max_send)
|
||||
- [peer_connect_interval](#peer_connect_interval)
|
||||
- [peer_connect_timeout](#peer_connect_timeout)
|
||||
- [osd_idle_timeout](#osd_idle_timeout)
|
||||
@@ -78,6 +79,13 @@ Implicit On-Demand Paging (Implicit ODP) и Scatter/Gather (SG). Наприме
|
||||
суперпользователя, чтобы посмотреть список доступных RDMA-устройств, их
|
||||
параметры и возможности.
|
||||
|
||||
Обратите внимание, что если вы используете RoCE/RoCEv2, вам также необходимо
|
||||
правильно настроить для него коммутаторы, иначе вы можете столкнуться с
|
||||
нестабильной производительностью. Подробную информацию о настройке
|
||||
коммутатора для RoCEv2 ищите в документации производителя. Обычно это
|
||||
подразумевает настройку сети без потерь на основе PFC (Priority Flow
|
||||
Control) и ECN (Explicit Congestion Notification).
|
||||
|
||||
## rdma_port_num
|
||||
|
||||
- Тип: целое число
|
||||
@@ -121,22 +129,32 @@ OSD в любом случае согласовывают реальное зн
|
||||
## rdma_max_msg
|
||||
|
||||
- Тип: целое число
|
||||
- Значение по умолчанию: 1048576
|
||||
- Значение по умолчанию: 132096
|
||||
|
||||
Максимальный размер одной RDMA-операции отправки или приёма.
|
||||
|
||||
## rdma_max_recv
|
||||
|
||||
- Тип: целое число
|
||||
- Значение по умолчанию: 16
|
||||
|
||||
Максимальное число буферов для RDMA-приёма данных на одно соединение
|
||||
(RDMA требует заранее выделенных буферов для приёма данных). Каждый буфер
|
||||
имеет размер `rdma_max_msg` байт. Таким образом, настройка прямо влияет на
|
||||
потребление памяти - один Vitastor-клиент с RDMA использует
|
||||
`rdma_max_recv * rdma_max_msg * ЧИСЛО_OSD` байт памяти, по умолчанию -
|
||||
примерно 2 МБ * число OSD.
|
||||
|
||||
## rdma_max_send
|
||||
|
||||
- Тип: целое число
|
||||
- Значение по умолчанию: 8
|
||||
|
||||
Максимальное число параллельных RDMA-операций получения данных. Следует
|
||||
иметь в виду, что данное число буферов размером `rdma_max_msg` выделяется
|
||||
для каждого подключённого клиентского соединения, так что данная настройка
|
||||
влияет на потребление памяти. Это так потому, что RDMA-приём данных в
|
||||
Vitastor, увы, всё равно не является zero-copy, т.е. всё равно 1 раз
|
||||
копирует данные в памяти. Данная особенность, возможно, будет исправлена в
|
||||
более новых версиях Vitastor.
|
||||
Максимальное число RDMA-операций отправки, отправляемых в очередь одного
|
||||
соединения. Желательно, чтобы оно было меньше `rdma_max_recv`, чтобы
|
||||
у принимающей стороны в процессе работы не заканчивались буферы на приём.
|
||||
Не влияет на потребление памяти - дополнительная память на операции отправки
|
||||
не выделяется.
|
||||
|
||||
## peer_connect_interval
|
||||
|
||||
|
@@ -53,6 +53,12 @@
|
||||
to work. For example, Mellanox ConnectX-3 and older adapters don't have
|
||||
Implicit ODP, so they're unsupported by Vitastor. Run `ibv_devinfo -v` as
|
||||
root to list available RDMA devices and their features.
|
||||
|
||||
Remember that you also have to configure your network switches if you use
|
||||
RoCE/RoCEv2, otherwise you may experience unstable performance. Refer to
|
||||
the manual of your network vendor for details about setting up the switch
|
||||
for RoCEv2 correctly. Usually it means setting up Lossless Ethernet with
|
||||
PFC (Priority Flow Control) and ECN (Explicit Congestion Notification).
|
||||
info_ru: |
|
||||
Название RDMA-устройства для связи с Vitastor OSD (например, "rocep5s0f0").
|
||||
Имейте в виду, что поддержка RDMA в Vitastor требует функций устройства
|
||||
@@ -61,6 +67,13 @@
|
||||
потому не поддерживаются в Vitastor. Запустите `ibv_devinfo -v` от имени
|
||||
суперпользователя, чтобы посмотреть список доступных RDMA-устройств, их
|
||||
параметры и возможности.
|
||||
|
||||
Обратите внимание, что если вы используете RoCE/RoCEv2, вам также необходимо
|
||||
правильно настроить для него коммутаторы, иначе вы можете столкнуться с
|
||||
нестабильной производительностью. Подробную информацию о настройке
|
||||
коммутатора для RoCEv2 ищите в документации производителя. Обычно это
|
||||
подразумевает настройку сети без потерь на основе PFC (Priority Flow
|
||||
Control) и ECN (Explicit Congestion Notification).
|
||||
- name: rdma_port_num
|
||||
type: int
|
||||
default: 1
|
||||
@@ -114,26 +127,39 @@
|
||||
так что менять этот параметр обычно не нужно.
|
||||
- name: rdma_max_msg
|
||||
type: int
|
||||
default: 1048576
|
||||
default: 132096
|
||||
info: Maximum size of a single RDMA send or receive operation in bytes.
|
||||
info_ru: Максимальный размер одной RDMA-операции отправки или приёма.
|
||||
- name: rdma_max_recv
|
||||
type: int
|
||||
default: 16
|
||||
info: |
|
||||
Maximum number of RDMA receive buffers per connection (RDMA requires
|
||||
preallocated buffers to receive data). Each buffer is `rdma_max_msg` bytes
|
||||
in size. So this setting directly affects memory usage: a single Vitastor
|
||||
RDMA client uses `rdma_max_recv * rdma_max_msg * OSD_COUNT` bytes of memory.
|
||||
Default is roughly 2 MB * number of OSDs.
|
||||
info_ru: |
|
||||
Максимальное число буферов для RDMA-приёма данных на одно соединение
|
||||
(RDMA требует заранее выделенных буферов для приёма данных). Каждый буфер
|
||||
имеет размер `rdma_max_msg` байт. Таким образом, настройка прямо влияет на
|
||||
потребление памяти - один Vitastor-клиент с RDMA использует
|
||||
`rdma_max_recv * rdma_max_msg * ЧИСЛО_OSD` байт памяти, по умолчанию -
|
||||
примерно 2 МБ * число OSD.
|
||||
- name: rdma_max_send
|
||||
type: int
|
||||
default: 8
|
||||
info: |
|
||||
Maximum number of parallel RDMA receive operations. Note that this number
|
||||
of receive buffers `rdma_max_msg` in size are allocated for each client,
|
||||
so this setting actually affects memory usage. This is because RDMA receive
|
||||
operations are (sadly) still not zero-copy in Vitastor. It may be fixed in
|
||||
later versions.
|
||||
Maximum number of outstanding RDMA send operations per connection. Should be
|
||||
less than `rdma_max_recv` so the receiving side doesn't run out of buffers.
|
||||
Doesn't affect memory usage - additional memory isn't allocated for send
|
||||
operations.
|
||||
info_ru: |
|
||||
Максимальное число параллельных RDMA-операций получения данных. Следует
|
||||
иметь в виду, что данное число буферов размером `rdma_max_msg` выделяется
|
||||
для каждого подключённого клиентского соединения, так что данная настройка
|
||||
влияет на потребление памяти. Это так потому, что RDMA-приём данных в
|
||||
Vitastor, увы, всё равно не является zero-copy, т.е. всё равно 1 раз
|
||||
копирует данные в памяти. Данная особенность, возможно, будет исправлена в
|
||||
более новых версиях Vitastor.
|
||||
Максимальное число RDMA-операций отправки, отправляемых в очередь одного
|
||||
соединения. Желательно, чтобы оно было меньше `rdma_max_recv`, чтобы
|
||||
у принимающей стороны в процессе работы не заканчивались буферы на приём.
|
||||
Не влияет на потребление памяти - дополнительная память на операции отправки
|
||||
не выделяется.
|
||||
- name: peer_connect_interval
|
||||
type: sec
|
||||
min: 1
|
||||
|
@@ -1,4 +1,4 @@
|
||||
[Documentation](../../README.md#documentation) → Usage → Disk Tool
|
||||
[Documentation](../../README.md#documentation) → Usage → Disk management tool
|
||||
|
||||
-----
|
||||
|
||||
|
@@ -1,4 +1,4 @@
|
||||
[Документация](../../README-ru.md#документация) → Использование → Управление дисками
|
||||
[Документация](../../README-ru.md#документация) → Использование → Инструмент управления дисками
|
||||
|
||||
-----
|
||||
|
||||
|
@@ -70,9 +70,9 @@ const etcd_tree = {
|
||||
rdma_gid_index: 0,
|
||||
rdma_mtu: 4096,
|
||||
rdma_max_sge: 128,
|
||||
rdma_max_send: 32,
|
||||
rdma_max_recv: 8,
|
||||
rdma_max_msg: 1048576,
|
||||
rdma_max_send: 8,
|
||||
rdma_max_recv: 16,
|
||||
rdma_max_msg: 132096,
|
||||
log_level: 0,
|
||||
block_size: 131072,
|
||||
disk_alignment: 4096,
|
||||
|
@@ -50,7 +50,7 @@ from cinder.volume import configuration
|
||||
from cinder.volume import driver
|
||||
from cinder.volume import volume_utils
|
||||
|
||||
VERSION = '0.8.5'
|
||||
VERSION = '0.8.6'
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@@ -25,4 +25,4 @@ rm fio
|
||||
mv fio-copy fio
|
||||
FIO=`rpm -qi fio | perl -e 'while(<>) { /^Epoch[\s:]+(\S+)/ && print "$1:"; /^Version[\s:]+(\S+)/ && print $1; /^Release[\s:]+(\S+)/ && print "-$1"; }'`
|
||||
perl -i -pe 's/(Requires:\s*fio)([^\n]+)?/$1 = '$FIO'/' $VITASTOR/rpm/vitastor-el$EL.spec
|
||||
tar --transform 's#^#vitastor-0.8.5/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.8.5$(rpm --eval '%dist').tar.gz *
|
||||
tar --transform 's#^#vitastor-0.8.6/#' --exclude 'rpm/*.rpm' -czf $VITASTOR/../vitastor-0.8.6$(rpm --eval '%dist').tar.gz *
|
||||
|
@@ -35,7 +35,7 @@ ADD . /root/vitastor
|
||||
RUN set -e; \
|
||||
cd /root/vitastor/rpm; \
|
||||
sh build-tarball.sh; \
|
||||
cp /root/vitastor-0.8.5.el7.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp /root/vitastor-0.8.6.el7.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp vitastor-el7.spec ~/rpmbuild/SPECS/vitastor.spec; \
|
||||
cd ~/rpmbuild/SPECS/; \
|
||||
rpmbuild -ba vitastor.spec; \
|
||||
|
@@ -1,11 +1,11 @@
|
||||
Name: vitastor
|
||||
Version: 0.8.5
|
||||
Version: 0.8.6
|
||||
Release: 1%{?dist}
|
||||
Summary: Vitastor, a fast software-defined clustered block storage
|
||||
|
||||
License: Vitastor Network Public License 1.1
|
||||
URL: https://vitastor.io/
|
||||
Source0: vitastor-0.8.5.el7.tar.gz
|
||||
Source0: vitastor-0.8.6.el7.tar.gz
|
||||
|
||||
BuildRequires: liburing-devel >= 0.6
|
||||
BuildRequires: gperftools-devel
|
||||
|
@@ -35,7 +35,7 @@ ADD . /root/vitastor
|
||||
RUN set -e; \
|
||||
cd /root/vitastor/rpm; \
|
||||
sh build-tarball.sh; \
|
||||
cp /root/vitastor-0.8.5.el8.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp /root/vitastor-0.8.6.el8.tar.gz ~/rpmbuild/SOURCES; \
|
||||
cp vitastor-el8.spec ~/rpmbuild/SPECS/vitastor.spec; \
|
||||
cd ~/rpmbuild/SPECS/; \
|
||||
rpmbuild -ba vitastor.spec; \
|
||||
|
@@ -1,11 +1,11 @@
|
||||
Name: vitastor
|
||||
Version: 0.8.5
|
||||
Version: 0.8.6
|
||||
Release: 1%{?dist}
|
||||
Summary: Vitastor, a fast software-defined clustered block storage
|
||||
|
||||
License: Vitastor Network Public License 1.1
|
||||
URL: https://vitastor.io/
|
||||
Source0: vitastor-0.8.5.el8.tar.gz
|
||||
Source0: vitastor-0.8.6.el8.tar.gz
|
||||
|
||||
BuildRequires: liburing-devel >= 0.6
|
||||
BuildRequires: gperftools-devel
|
||||
|
@@ -16,7 +16,7 @@ if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
|
||||
set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR}")
|
||||
endif()
|
||||
|
||||
add_definitions(-DVERSION="0.8.5")
|
||||
add_definitions(-DVERSION="0.8.6")
|
||||
add_definitions(-Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fdiagnostics-color=always -I ${CMAKE_SOURCE_DIR}/src)
|
||||
if (${WITH_ASAN})
|
||||
add_definitions(-fsanitize=address -fno-omit-frame-pointer)
|
||||
@@ -98,7 +98,7 @@ endif (${WITH_FIO})
|
||||
# libvitastor_common.a
|
||||
set(MSGR_RDMA "")
|
||||
if (IBVERBS_LIBRARIES)
|
||||
set(MSGR_RDMA msgr_rdma.cpp freelist.cpp allocator.cpp)
|
||||
set(MSGR_RDMA "msgr_rdma.cpp")
|
||||
endif (IBVERBS_LIBRARIES)
|
||||
add_library(vitastor_common STATIC
|
||||
epoll_manager.cpp etcd_state_client.cpp messenger.cpp addr_util.cpp
|
||||
@@ -278,11 +278,6 @@ add_executable(test_allocator EXCLUDE_FROM_ALL test_allocator.cpp allocator.cpp)
|
||||
add_dependencies(build_tests test_allocator)
|
||||
add_test(NAME test_allocator COMMAND test_allocator)
|
||||
|
||||
# test_freelist
|
||||
add_executable(test_freelist EXCLUDE_FROM_ALL test_freelist.cpp)
|
||||
add_dependencies(build_tests test_freelist)
|
||||
add_test(NAME test_freelist COMMAND test_freelist)
|
||||
|
||||
# test_cas
|
||||
add_executable(test_cas
|
||||
test_cas.cpp
|
||||
|
@@ -1,63 +0,0 @@
|
||||
// Copyright (c) Vitaliy Filippov, 2023+
|
||||
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
|
||||
|
||||
#include <assert.h>
|
||||
#include "freelist.h"
|
||||
|
||||
uint64_t freelist_allocator_t::alloc(uint64_t data_size)
|
||||
{
|
||||
for (int i = 0; i < freelist.size(); i++)
|
||||
{
|
||||
if (freelist[i].size >= data_size)
|
||||
{
|
||||
uint64_t r = freelist[i].start;
|
||||
freelist[i].start += data_size;
|
||||
freelist[i].size -= data_size;
|
||||
return r;
|
||||
}
|
||||
}
|
||||
return UINT64_MAX;
|
||||
}
|
||||
|
||||
void freelist_allocator_t::free(uint64_t start, uint64_t size)
|
||||
{
|
||||
int min = 0, max = freelist.size();
|
||||
if (max && freelist[freelist.size()-1].start < start)
|
||||
{
|
||||
min = max;
|
||||
}
|
||||
if (max && freelist[0].start >= start)
|
||||
{
|
||||
max = 0;
|
||||
}
|
||||
while (max-min > 1)
|
||||
{
|
||||
int mid = (min+max)/2;
|
||||
if (freelist[mid].start >= start)
|
||||
max = mid;
|
||||
else
|
||||
min = mid;
|
||||
}
|
||||
// max = the first item where freelist[max].start >= start
|
||||
if (max > 0 && freelist[max-1].start+freelist[max-1].size >= start)
|
||||
{
|
||||
assert(freelist[max-1].start+freelist[max-1].size == start);
|
||||
freelist[max-1].size += size;
|
||||
}
|
||||
else if (max < freelist.size() && freelist[max].start <= size+start)
|
||||
{
|
||||
assert(freelist[max].start == size+start);
|
||||
freelist[max].start -= size;
|
||||
freelist[max].size += size;
|
||||
}
|
||||
else
|
||||
{
|
||||
freelist.insert(freelist.begin()+min, (freelist_item_t){ .start = start, .size = size });
|
||||
max = min; // to skip the if below
|
||||
}
|
||||
if (min != max && max < freelist.size() && freelist[max].start == freelist[min].start+freelist[min].size)
|
||||
{
|
||||
freelist[min].size += freelist[max].size;
|
||||
freelist.erase(freelist.begin()+max, freelist.begin()+max+1);
|
||||
}
|
||||
}
|
@@ -1,23 +0,0 @@
|
||||
// Copyright (c) Vitaliy Filippov, 2023+
|
||||
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
#include <vector>
|
||||
|
||||
struct freelist_item_t
|
||||
{
|
||||
uint64_t start, size;
|
||||
};
|
||||
|
||||
// Really trivial freelist allocator
|
||||
// Should be fine for remote RDMA memory management because
|
||||
// most of the time fragmentation shouldn't be an issue as all
|
||||
// memory regions are short-lived
|
||||
struct freelist_allocator_t
|
||||
{
|
||||
std::vector<freelist_item_t> freelist;
|
||||
uint64_t alloc(uint64_t data_size);
|
||||
void free(uint64_t start, uint64_t size);
|
||||
};
|
@@ -157,16 +157,13 @@ void osd_messenger_t::parse_config(const json11::Json & config)
|
||||
this->rdma_max_sge = 128;
|
||||
this->rdma_max_send = config["rdma_max_send"].uint64_value();
|
||||
if (!this->rdma_max_send)
|
||||
this->rdma_max_send = 128;
|
||||
this->rdma_max_send = 8;
|
||||
this->rdma_max_recv = config["rdma_max_recv"].uint64_value();
|
||||
if (!this->rdma_max_recv)
|
||||
this->rdma_max_recv = 128;
|
||||
this->rdma_op_slots = config["rdma_op_slots"].uint64_value();
|
||||
if (!this->rdma_op_slots || this->rdma_op_slots >= 1024*1024)
|
||||
this->rdma_op_slots = 4096;
|
||||
this->rdma_op_memory = config["rdma_op_memory"].uint64_value();
|
||||
if (!this->rdma_op_memory || this->rdma_op_memory >= 1024*1024*1024)
|
||||
this->rdma_op_memory = 16*1024*1024;
|
||||
this->rdma_max_recv = 16;
|
||||
this->rdma_max_msg = config["rdma_max_msg"].uint64_value();
|
||||
if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024)
|
||||
this->rdma_max_msg = 129*1024;
|
||||
#endif
|
||||
this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value();
|
||||
if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024)
|
||||
@@ -391,16 +388,12 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||
#ifdef WITH_RDMA
|
||||
if (rdma_context)
|
||||
{
|
||||
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_op_slots, rdma_op_memory);
|
||||
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg);
|
||||
if (cl->rdma_conn)
|
||||
{
|
||||
clients_by_qp[cl->rdma_conn->qp->qp_num] = cl->peer_fd;
|
||||
json11::Json payload = json11::Json::object {
|
||||
{ "connect_rdma", cl->rdma_conn->addr.to_string() },
|
||||
{ "rdma_data_rkey", (uint64_t)cl->rdma_conn->in_data_mr->rkey },
|
||||
{ "rdma_op_rkey", (uint64_t)cl->rdma_conn->in_op_mr->rkey },
|
||||
{ "rdma_op_slots", cl->rdma_conn->op_slots },
|
||||
{ "rdma_op_memory", cl->rdma_conn->op_memory },
|
||||
{ "rdma_max_msg", cl->rdma_conn->max_msg },
|
||||
};
|
||||
std::string payload_str = payload.dump();
|
||||
op->req.show_conf.json_len = payload_str.size();
|
||||
@@ -460,14 +453,12 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||
{
|
||||
msgr_rdma_address_t addr;
|
||||
if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) ||
|
||||
config["rdma_op_memory"].uint64_value() == 0 ||
|
||||
cl->rdma_conn->connect(&addr) != 0)
|
||||
{
|
||||
fprintf(
|
||||
stderr, "Failed to connect to OSD %lu (address %s) using RDMA\n",
|
||||
cl->osd_num, config["rdma_address"].string_value().c_str()
|
||||
);
|
||||
clients_by_qp.erase(cl->rdma_conn->qp->qp_num);
|
||||
delete cl->rdma_conn;
|
||||
cl->rdma_conn = NULL;
|
||||
// FIXME: Keep TCP connection in this case
|
||||
@@ -479,12 +470,11 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
|
||||
}
|
||||
else
|
||||
{
|
||||
cl->rdma_conn->set_out_capacity(
|
||||
config["rdma_data_rkey"].uint64_value(),
|
||||
config["rdma_op_rkey"].uint64_value(),
|
||||
config["rdma_op_slots"].uint64_value(),
|
||||
config["rdma_op_memory"].uint64_value()
|
||||
);
|
||||
uint64_t server_max_msg = config["rdma_max_msg"].uint64_value();
|
||||
if (cl->rdma_conn->max_msg > server_max_msg)
|
||||
{
|
||||
cl->rdma_conn->max_msg = server_max_msg;
|
||||
}
|
||||
if (log_level > 0)
|
||||
{
|
||||
fprintf(stderr, "Connected to OSD %lu using RDMA\n", cl->osd_num);
|
||||
|
@@ -37,7 +37,6 @@
|
||||
|
||||
#define MSGR_SENDP_HDR 1
|
||||
#define MSGR_SENDP_FREE 2
|
||||
#define MSGR_SENDP_LAST 4
|
||||
|
||||
struct msgr_sendp_t
|
||||
{
|
||||
@@ -132,10 +131,9 @@ protected:
|
||||
bool use_rdma = true;
|
||||
std::string rdma_device;
|
||||
uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0;
|
||||
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
|
||||
uint64_t rdma_op_slots = 0, rdma_op_memory = 0;
|
||||
msgr_rdma_context_t *rdma_context = NULL;
|
||||
std::map<uint32_t, int> clients_by_qp;
|
||||
uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0;
|
||||
uint64_t rdma_max_msg = 0;
|
||||
#endif
|
||||
|
||||
std::vector<int> read_ready_clients;
|
||||
@@ -172,8 +170,7 @@ public:
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
bool is_rdma_enabled();
|
||||
bool connect_rdma(int peer_fd, std::string rdma_address,
|
||||
uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory);
|
||||
bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg);
|
||||
#endif
|
||||
|
||||
protected:
|
||||
@@ -194,13 +191,12 @@ protected:
|
||||
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
|
||||
bool handle_finished_read(osd_client_t *cl);
|
||||
void handle_op_hdr(osd_client_t *cl);
|
||||
bool handle_reply_hdr(void *reply_hdr, osd_client_t *cl);
|
||||
bool handle_reply_hdr(osd_client_t *cl);
|
||||
void handle_reply_ready(osd_op_t *op);
|
||||
|
||||
#ifdef WITH_RDMA
|
||||
bool try_send_rdma(osd_client_t *cl);
|
||||
bool try_recv_rdma(osd_client_t *cl);
|
||||
void handle_rdma_events();
|
||||
bool rdma_handle_op(osd_client_t *cl, uint32_t op_slot);
|
||||
#endif
|
||||
};
|
||||
|
@@ -46,20 +46,9 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t()
|
||||
ctx->used_max_cqe -= max_send+max_recv;
|
||||
if (qp)
|
||||
ibv_destroy_qp(qp);
|
||||
if (in_data_mr)
|
||||
ibv_dereg_mr(in_data_mr);
|
||||
if (in_op_mr)
|
||||
ibv_dereg_mr(in_op_mr);
|
||||
if (in_data_buf)
|
||||
free(in_data_buf);
|
||||
if (in_ops)
|
||||
free(in_ops);
|
||||
if (out_op_alloc)
|
||||
delete out_op_alloc;
|
||||
if (out_slot_data)
|
||||
free(out_slot_data);
|
||||
if (out_slot_ops)
|
||||
free(out_slot_ops);
|
||||
if (recv_buffers.size())
|
||||
for (auto b: recv_buffers)
|
||||
free(b);
|
||||
}
|
||||
|
||||
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level)
|
||||
@@ -160,7 +149,7 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t
|
||||
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND);
|
||||
if (!ctx->mr)
|
||||
{
|
||||
fprintf(stderr, "Couldn't register global RDMA memory region: %s\n", strerror(errno));
|
||||
fprintf(stderr, "Couldn't register RDMA memory region\n");
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
@@ -191,7 +180,7 @@ cleanup:
|
||||
}
|
||||
|
||||
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send,
|
||||
uint32_t max_recv, uint32_t max_sge, uint64_t op_slots, uint64_t op_memory)
|
||||
uint32_t max_recv, uint32_t max_sge, uint32_t max_msg)
|
||||
{
|
||||
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t;
|
||||
|
||||
@@ -201,6 +190,7 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
|
||||
conn->max_send = max_send;
|
||||
conn->max_recv = max_recv;
|
||||
conn->max_sge = max_sge;
|
||||
conn->max_msg = max_msg;
|
||||
|
||||
ctx->used_max_cqe += max_send+max_recv;
|
||||
if (ctx->used_max_cqe > ctx->max_cqe)
|
||||
@@ -221,30 +211,6 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
|
||||
ctx->max_cqe = new_max_cqe;
|
||||
}
|
||||
|
||||
conn->op_memory = op_memory;
|
||||
conn->in_data_buf = memalign_or_die(MEM_ALIGNMENT, op_memory);
|
||||
conn->in_data_mr = ibv_reg_mr(ctx->pd, conn->in_data_buf, op_memory,
|
||||
IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND);
|
||||
if (!conn->in_data_mr)
|
||||
{
|
||||
fprintf(stderr, "Couldn't register %lu MB RDMA memory region for incoming data: %s\n",
|
||||
(op_memory+1024*1024-1)/1024/1024, strerror(errno));
|
||||
delete conn;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
conn->op_slots = op_slots;
|
||||
conn->in_ops = (msgr_rdma_cmd_t *)malloc_or_die(sizeof(msgr_rdma_cmd_t) * op_slots);
|
||||
conn->in_op_mr = ibv_reg_mr(ctx->pd, conn->in_ops, sizeof(msgr_rdma_cmd_t) * op_slots,
|
||||
IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND);
|
||||
if (!conn->in_op_mr)
|
||||
{
|
||||
fprintf(stderr, "Couldn't register %lu KB RDMA memory region for incoming operation headers: %s\n",
|
||||
(sizeof(msgr_rdma_cmd_t) * op_slots + 1023)/1024, strerror(errno));
|
||||
delete conn;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ibv_qp_init_attr init_attr = {
|
||||
.send_cq = ctx->cq,
|
||||
.recv_cq = ctx->cq,
|
||||
@@ -271,7 +237,7 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx,
|
||||
|
||||
ibv_qp_attr attr = {
|
||||
.qp_state = IBV_QPS_INIT,
|
||||
.qp_access_flags = IBV_ACCESS_REMOTE_WRITE,
|
||||
.qp_access_flags = 0,
|
||||
.pkey_index = 0,
|
||||
.port_num = ctx->ib_port,
|
||||
};
|
||||
@@ -299,19 +265,6 @@ static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu)
|
||||
return IBV_MTU_4096;
|
||||
}
|
||||
|
||||
void msgr_rdma_connection_t::set_out_capacity(uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory)
|
||||
{
|
||||
assert(!out_op_alloc);
|
||||
this->out_data_rkey = out_data_rkey;
|
||||
this->out_op_rkey = out_op_rkey;
|
||||
this->out_op_slots = out_op_slots;
|
||||
this->out_op_memory = out_op_memory;
|
||||
out_op_alloc = new allocator(out_op_slots);
|
||||
out_data_alloc.free(0, out_op_memory);
|
||||
out_slot_data = (msgr_rdma_out_pos_t *)malloc_or_die(sizeof(msgr_rdma_out_pos_t) * out_op_slots);
|
||||
out_slot_ops = (osd_op_t **)malloc_or_die(sizeof(osd_op_t *) * out_op_slots);
|
||||
}
|
||||
|
||||
int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest)
|
||||
{
|
||||
auto conn = this;
|
||||
@@ -358,14 +311,17 @@ int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest)
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address,
|
||||
uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory)
|
||||
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg)
|
||||
{
|
||||
// Try to connect to the peer using RDMA
|
||||
msgr_rdma_address_t addr;
|
||||
if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr))
|
||||
{
|
||||
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_op_slots, rdma_op_memory);
|
||||
if (client_max_msg > rdma_max_msg)
|
||||
{
|
||||
client_max_msg = rdma_max_msg;
|
||||
}
|
||||
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg);
|
||||
if (rdma_conn)
|
||||
{
|
||||
int r = rdma_conn->connect(&addr);
|
||||
@@ -380,8 +336,6 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address,
|
||||
else
|
||||
{
|
||||
// Remember connection, but switch to RDMA only after sending the configuration response
|
||||
clients_by_qp[rdma_conn->qp->qp_num] = peer_fd;
|
||||
rdma_conn->set_out_capacity(out_data_rkey, out_op_rkey, out_op_slots, out_op_memory);
|
||||
auto cl = clients.at(peer_fd);
|
||||
cl->rdma_conn = rdma_conn;
|
||||
cl->peer_state = PEER_RDMA_CONNECTING;
|
||||
@@ -392,172 +346,83 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address,
|
||||
return false;
|
||||
}
|
||||
|
||||
static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
|
||||
{
|
||||
ibv_send_wr *bad_wr = NULL;
|
||||
ibv_send_wr wr = {
|
||||
.wr_id = (uint64_t)(cl->peer_fd*2+1),
|
||||
.sg_list = sge,
|
||||
.num_sge = op_sge,
|
||||
.opcode = IBV_WR_SEND,
|
||||
.send_flags = IBV_SEND_SIGNALED,
|
||||
};
|
||||
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
{
|
||||
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
cl->rdma_conn->cur_send++;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
|
||||
{
|
||||
auto rc = cl->rdma_conn;
|
||||
if (!cl->send_list.size() && !rc->in_slots_freed.size() || rc->cur_send >= rc->max_send)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
int i = 0;
|
||||
while (i < rc->in_slots_freed.size())
|
||||
{
|
||||
auto op_slot = rc->in_slots_freed[i++];
|
||||
assert(op_slot < 0x80000000);
|
||||
ibv_send_wr *bad_wr = NULL;
|
||||
ibv_send_wr wr = {
|
||||
.wr_id = 0,
|
||||
.opcode = IBV_WR_RDMA_WRITE_WITH_IMM,
|
||||
.imm_data = 0x80000000 | op_slot,
|
||||
};
|
||||
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
{
|
||||
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
rc->cur_send++;
|
||||
if (rc->cur_send >= rc->max_send)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
rc->in_slots_freed.erase(rc->in_slots_freed.begin(), rc->in_slots_freed.begin()+i);
|
||||
if (!cl->send_list.size() || rc->cur_send >= rc->max_send)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
uint64_t op_size = 0, op_sge = 0;
|
||||
ibv_sge sge[rc->max_sge];
|
||||
int op_start = 0;
|
||||
while (op_start < cl->send_list.size())
|
||||
while (rc->send_pos < cl->send_list.size())
|
||||
{
|
||||
uint64_t op_data_size = 0;
|
||||
int op_end = op_start;
|
||||
while (!(cl->outbox[op_end].flags & MSGR_SENDP_LAST))
|
||||
iovec & iov = cl->send_list[rc->send_pos];
|
||||
if (op_size >= rc->max_msg || op_sge >= rc->max_sge)
|
||||
{
|
||||
op_data_size += cl->send_list[op_end].iov_len;
|
||||
op_end++;
|
||||
}
|
||||
op_data_size += cl->send_list[op_end].iov_len;
|
||||
op_end++;
|
||||
op_data_size -= cl->send_list[op_start].iov_len;
|
||||
// Operation boundaries in send_list: op_start..op_end, first iovec is the header
|
||||
uint64_t op_slot = rc->out_op_alloc->find_free();
|
||||
if (op_slot == UINT64_MAX)
|
||||
{
|
||||
// op queue is full
|
||||
return true;
|
||||
}
|
||||
uint64_t data_pos = UINT64_MAX;
|
||||
if (op_data_size >= 0)
|
||||
{
|
||||
if (rc->cur_send > rc->max_send-1-(op_end-op_start-1+rc->max_sge)/rc->max_sge)
|
||||
rc->send_sizes.push_back(op_size);
|
||||
try_send_rdma_wr(cl, sge, op_sge);
|
||||
op_sge = 0;
|
||||
op_size = 0;
|
||||
if (rc->cur_send >= rc->max_send)
|
||||
{
|
||||
// RDMA queue is full
|
||||
return true;
|
||||
}
|
||||
// FIXME: Oops, and what if op data is larger than the whole buffer... :)
|
||||
data_pos = rc->out_data_alloc.alloc(op_data_size);
|
||||
if (data_pos == UINT64_MAX)
|
||||
{
|
||||
// data buffers are full
|
||||
return true;
|
||||
}
|
||||
int cur_sge = 0;
|
||||
for (int data_sent = 1; data_sent < op_end; data_sent++)
|
||||
{
|
||||
sge[cur_sge++] = {
|
||||
.addr = (uintptr_t)cl->send_list[data_sent].iov_base,
|
||||
.length = (uint32_t)cl->send_list[data_sent].iov_len,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
if (data_sent == op_end-1 || cur_sge >= rc->max_sge)
|
||||
{
|
||||
ibv_send_wr *bad_wr = NULL;
|
||||
ibv_send_wr wr = {
|
||||
.wr_id = op_slot,
|
||||
.next = NULL,
|
||||
.sg_list = sge,
|
||||
.num_sge = cur_sge,
|
||||
.opcode = IBV_WR_RDMA_WRITE,
|
||||
.send_flags = 0,
|
||||
.wr = {
|
||||
.rdma = {
|
||||
.remote_addr = data_pos,
|
||||
.rkey = rc->out_data_rkey,
|
||||
},
|
||||
},
|
||||
};
|
||||
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
{
|
||||
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
}
|
||||
rc->cur_send++;
|
||||
cur_sge = 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (rc->cur_send > rc->max_send-1)
|
||||
{
|
||||
// RDMA queue is full
|
||||
return true;
|
||||
}
|
||||
rc->out_op_alloc->set(op_slot, true);
|
||||
assert(cl->send_list[op_start].iov_len == OSD_PACKET_SIZE);
|
||||
sge[0] = {
|
||||
.addr = (uintptr_t)cl->send_list[op_start].iov_base,
|
||||
.length = (uint32_t)cl->send_list[op_start].iov_len,
|
||||
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg
|
||||
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size);
|
||||
sge[op_sge++] = {
|
||||
.addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos),
|
||||
.length = len,
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
rc->out_slot_data[op_slot] = { .data_pos = data_pos, .data_size = op_data_size };
|
||||
rc->out_slot_ops[op_slot] = (cl->outbox[op_end-1].flags & MSGR_SENDP_FREE)
|
||||
? cl->outbox[op_end-1].op : NULL;
|
||||
sge[1] = {
|
||||
.addr = (uintptr_t)(rc->out_slot_data+op_slot),
|
||||
.length = sizeof(rc->out_slot_data[op_slot]),
|
||||
.lkey = rc->ctx->mr->lkey,
|
||||
};
|
||||
ibv_send_wr *bad_wr = NULL;
|
||||
ibv_send_wr wr = {
|
||||
.wr_id = op_slot,
|
||||
.next = NULL,
|
||||
.sg_list = sge,
|
||||
.num_sge = 2,
|
||||
.opcode = IBV_WR_RDMA_WRITE_WITH_IMM,
|
||||
.send_flags = IBV_SEND_SIGNALED,
|
||||
.imm_data = (uint32_t)op_slot,
|
||||
.wr = {
|
||||
.rdma = {
|
||||
.remote_addr = op_slot*sizeof(msgr_rdma_cmd_t),
|
||||
.rkey = rc->out_op_rkey,
|
||||
},
|
||||
},
|
||||
};
|
||||
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
op_size += len;
|
||||
rc->send_buf_pos += len;
|
||||
if (rc->send_buf_pos >= iov.iov_len)
|
||||
{
|
||||
fprintf(stderr, "RDMA send failed: %s\n", strerror(err));
|
||||
exit(1);
|
||||
rc->send_pos++;
|
||||
rc->send_buf_pos = 0;
|
||||
}
|
||||
rc->cur_send++;
|
||||
op_start = op_end;
|
||||
}
|
||||
if (op_start > 0)
|
||||
if (op_sge > 0)
|
||||
{
|
||||
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_start);
|
||||
rc->send_sizes.push_back(op_size);
|
||||
try_send_rdma_wr(cl, sge, op_sge);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
|
||||
static void try_recv_rdma_wr(osd_client_t *cl, void *buf)
|
||||
{
|
||||
ibv_sge sge = {
|
||||
.addr = (uintptr_t)buf,
|
||||
.length = (uint32_t)cl->rdma_conn->max_msg,
|
||||
.lkey = cl->rdma_conn->ctx->mr->lkey,
|
||||
};
|
||||
ibv_recv_wr *bad_wr = NULL;
|
||||
ibv_recv_wr wr = {
|
||||
.wr_id = (uint64_t)(cl->peer_fd*2),
|
||||
.sg_list = sge,
|
||||
.num_sge = op_sge,
|
||||
.sg_list = &sge,
|
||||
.num_sge = 1,
|
||||
};
|
||||
int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr);
|
||||
if (err || bad_wr)
|
||||
@@ -568,87 +433,18 @@ static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
|
||||
cl->rdma_conn->cur_recv++;
|
||||
}
|
||||
|
||||
static void copy_data_to_recv_list(uint8_t *data_buf, uint64_t data_size, osd_client_t *cl)
|
||||
{
|
||||
uint64_t pos = 0;
|
||||
while (cl->recv_list.done < cl->recv_list.count)
|
||||
{
|
||||
uint64_t cur = cl->recv_list.buf[cl->recv_list.done].iov_len;
|
||||
assert(cur <= data_size-pos);
|
||||
memcpy(cl->recv_list.buf[cl->recv_list.done].iov_base, data_buf+pos, cur);
|
||||
pos += cur;
|
||||
}
|
||||
cl->recv_list.reset();
|
||||
}
|
||||
|
||||
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
|
||||
{
|
||||
auto rc = cl->rdma_conn;
|
||||
while (rc->cur_recv < rc->max_recv)
|
||||
{
|
||||
try_recv_rdma_wr(cl, NULL, 0);
|
||||
void *buf = malloc_or_die(rc->max_msg);
|
||||
rc->recv_buffers.push_back(buf);
|
||||
try_recv_rdma_wr(cl, buf);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool osd_messenger_t::rdma_handle_op(osd_client_t *cl, uint32_t op_slot)
|
||||
{
|
||||
auto rc = cl->rdma_conn;
|
||||
if (op_slot >= rc->in_op_cap)
|
||||
{
|
||||
// Invalid incoming index
|
||||
fprintf(stderr, "Client %d invalid incoming RDMA op slot: %u, dropping connection\n", cl->peer_fd, op_slot);
|
||||
stop_client(cl->peer_fd);
|
||||
return false;
|
||||
}
|
||||
osd_op_header_t *hdr = (osd_op_header_t *)rc->in_ops[op_slot].header;
|
||||
uint8_t *data_buf = (uint8_t*)rc->in_data_buf + rc->in_ops[op_slot].pos.data_pos;
|
||||
uint64_t data_size = rc->in_ops[op_slot].pos.data_size;
|
||||
if (hdr->magic == SECONDARY_OSD_REPLY_MAGIC)
|
||||
{
|
||||
// Reply
|
||||
if (cl->read_op)
|
||||
{
|
||||
delete cl->read_op;
|
||||
cl->read_op = NULL;
|
||||
}
|
||||
if (!handle_reply_hdr(rc->in_ops[op_slot].header, cl))
|
||||
return false;
|
||||
if (cl->read_state == CL_READ_REPLY_DATA)
|
||||
{
|
||||
// copy reply data to cl->recv_list
|
||||
copy_data_to_recv_list(data_buf, data_size, cl);
|
||||
// and handle reply with data
|
||||
handle_reply_ready(cl->read_op);
|
||||
cl->read_op = NULL;
|
||||
cl->read_state = 0;
|
||||
cl->read_remaining = 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Operation
|
||||
cl->read_op = new osd_op_t;
|
||||
cl->read_op->peer_fd = cl->peer_fd;
|
||||
cl->read_op->op_type = OSD_OP_IN;
|
||||
memcpy(&cl->read_op->req, hdr, OSD_PACKET_SIZE);
|
||||
handle_op_hdr(cl);
|
||||
if (cl->read_state == CL_READ_DATA)
|
||||
{
|
||||
copy_data_to_recv_list(data_buf, data_size, cl);
|
||||
// And handle the incoming op with data
|
||||
cl->received_ops.push_back(cl->read_op);
|
||||
set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); });
|
||||
cl->read_op = NULL;
|
||||
cl->read_state = 0;
|
||||
}
|
||||
}
|
||||
// We don't need the incoming data buffer anymore, notify peer about it
|
||||
// FIXME: Allow to pass memory to the internal layer without copying and notify after handling it
|
||||
rc->in_slots_freed.push_back(op_slot);
|
||||
return true;
|
||||
}
|
||||
|
||||
#define RDMA_EVENTS_AT_ONCE 32
|
||||
|
||||
void osd_messenger_t::handle_rdma_events()
|
||||
@@ -673,60 +469,80 @@ void osd_messenger_t::handle_rdma_events()
|
||||
event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc);
|
||||
for (int i = 0; i < event_count; i++)
|
||||
{
|
||||
auto cqp_it = clients_by_qp.find(wc[i].qp_num);
|
||||
int peer_fd = cqp_it != clients_by_qp.end() ? cqp_it->second : -1;
|
||||
auto cl_it = clients.find(peer_fd);
|
||||
int client_id = wc[i].wr_id >> 1;
|
||||
bool is_send = wc[i].wr_id & 1;
|
||||
auto cl_it = clients.find(client_id);
|
||||
if (cl_it == clients.end())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
osd_client_t *cl = cl_it->second;
|
||||
auto rc = cl->rdma_conn;
|
||||
if (wc[i].status != IBV_WC_SUCCESS)
|
||||
{
|
||||
fprintf(stderr, "RDMA work request failed for client %d", peer_fd);
|
||||
fprintf(stderr, "RDMA work request failed for client %d", client_id);
|
||||
if (cl->osd_num)
|
||||
{
|
||||
fprintf(stderr, " (OSD %lu)", cl->osd_num);
|
||||
}
|
||||
fprintf(stderr, " with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status));
|
||||
if (peer_fd >= 0)
|
||||
stop_client(peer_fd);
|
||||
stop_client(client_id);
|
||||
continue;
|
||||
}
|
||||
auto rc = cl->rdma_conn;
|
||||
if (wc[i].opcode == IBV_WC_RDMA_WRITE)
|
||||
if (!is_send)
|
||||
{
|
||||
// Operation or reply is sent, we can free it
|
||||
auto & op = rc->out_slot_ops[wc[i].wr_id];
|
||||
if (op)
|
||||
rc->cur_recv--;
|
||||
if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len))
|
||||
{
|
||||
delete op;
|
||||
op = NULL;
|
||||
// handle_read_buffer may stop the client
|
||||
continue;
|
||||
}
|
||||
rc->cur_send--;
|
||||
try_send_rdma(cl);
|
||||
try_recv_rdma_wr(cl, rc->recv_buffers[rc->next_recv_buf]);
|
||||
rc->next_recv_buf = (rc->next_recv_buf+1) % rc->recv_buffers.size();
|
||||
}
|
||||
else if (wc[i].opcode == IBV_WC_RECV)
|
||||
else
|
||||
{
|
||||
if (!(wc[i].imm_data & 0x80000000))
|
||||
rc->cur_send--;
|
||||
uint64_t sent_size = rc->send_sizes.at(0);
|
||||
rc->send_sizes.erase(rc->send_sizes.begin(), rc->send_sizes.begin()+1);
|
||||
int send_pos = 0, send_buf_pos = 0;
|
||||
while (sent_size > 0)
|
||||
{
|
||||
// Operation or reply received. Handle it
|
||||
if (!rdma_handle_op(cl, wc[i].imm_data))
|
||||
if (sent_size >= cl->send_list.at(send_pos).iov_len)
|
||||
{
|
||||
// false means that the client is stopped due to invalid operation
|
||||
continue;
|
||||
sent_size -= cl->send_list[send_pos].iov_len;
|
||||
send_pos++;
|
||||
}
|
||||
else
|
||||
{
|
||||
send_buf_pos = sent_size;
|
||||
sent_size = 0;
|
||||
}
|
||||
rc->cur_recv--;
|
||||
try_recv_rdma(cl);
|
||||
}
|
||||
else
|
||||
assert(rc->send_pos >= send_pos);
|
||||
if (rc->send_pos == send_pos)
|
||||
{
|
||||
// Outbox slot is marked as free (the remote side doesn't need it anymore)
|
||||
uint32_t op_slot = wc[i].imm_data & 0x7FFFFFFF;
|
||||
auto & pos = rc->in_ops[op_slot].pos;
|
||||
if (pos.data_size > 0)
|
||||
rc->out_data_alloc.free(pos.data_pos, pos.data_size);
|
||||
rc->out_op_alloc->set(op_slot, false);
|
||||
rc->send_buf_pos -= send_buf_pos;
|
||||
}
|
||||
rc->send_pos -= send_pos;
|
||||
for (int i = 0; i < send_pos; i++)
|
||||
{
|
||||
if (cl->outbox[i].flags & MSGR_SENDP_FREE)
|
||||
{
|
||||
// Reply fully sent
|
||||
delete cl->outbox[i].op;
|
||||
}
|
||||
}
|
||||
if (send_pos > 0)
|
||||
{
|
||||
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos);
|
||||
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos);
|
||||
}
|
||||
if (send_buf_pos > 0)
|
||||
{
|
||||
cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + send_buf_pos;
|
||||
cl->send_list[0].iov_len -= send_buf_pos;
|
||||
}
|
||||
// Try to continue sending
|
||||
try_send_rdma(cl);
|
||||
}
|
||||
}
|
||||
|
@@ -5,11 +5,6 @@
|
||||
#include <infiniband/verbs.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "allocator.h"
|
||||
#include "freelist.h"
|
||||
#include "osd_ops.h"
|
||||
|
||||
struct osd_op_t;
|
||||
|
||||
struct msgr_rdma_address_t
|
||||
{
|
||||
@@ -44,17 +39,6 @@ struct msgr_rdma_context_t
|
||||
~msgr_rdma_context_t();
|
||||
};
|
||||
|
||||
struct msgr_rdma_out_pos_t
|
||||
{
|
||||
uint64_t data_pos, data_size;
|
||||
};
|
||||
|
||||
struct msgr_rdma_cmd_t
|
||||
{
|
||||
uint8_t header[OSD_PACKET_SIZE];
|
||||
msgr_rdma_out_pos_t pos;
|
||||
};
|
||||
|
||||
struct msgr_rdma_connection_t
|
||||
{
|
||||
msgr_rdma_context_t *ctx = NULL;
|
||||
@@ -62,24 +46,14 @@ struct msgr_rdma_connection_t
|
||||
msgr_rdma_address_t addr;
|
||||
int max_send = 0, max_recv = 0, max_sge = 0;
|
||||
int cur_send = 0, cur_recv = 0;
|
||||
uint64_t op_slots = 0, op_memory = 0;
|
||||
uint64_t max_msg = 0;
|
||||
|
||||
ibv_mr *in_data_mr = NULL, *in_op_mr = NULL;
|
||||
msgr_rdma_cmd_t *in_ops = NULL;
|
||||
int in_op_cap = 0;
|
||||
void *in_data_buf = NULL;
|
||||
std::vector<uint32_t> in_slots_freed;
|
||||
|
||||
uint32_t out_data_rkey = 0, out_op_rkey = 0;
|
||||
uint64_t out_op_slots = 0, out_op_memory = 0;
|
||||
allocator *out_op_alloc = NULL;
|
||||
freelist_allocator_t out_data_alloc;
|
||||
msgr_rdma_out_pos_t *out_slot_data = NULL;
|
||||
osd_op_t **out_slot_ops = NULL;
|
||||
int send_pos = 0, send_buf_pos = 0;
|
||||
int next_recv_buf = 0;
|
||||
std::vector<void*> recv_buffers;
|
||||
std::vector<uint64_t> send_sizes;
|
||||
|
||||
~msgr_rdma_connection_t();
|
||||
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send,
|
||||
uint32_t max_recv, uint32_t max_sge, uint64_t op_slots, uint64_t op_memory);
|
||||
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);
|
||||
int connect(msgr_rdma_address_t *dest);
|
||||
void set_out_capacity(uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory);
|
||||
};
|
||||
|
@@ -172,7 +172,7 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
|
||||
if (cl->read_state == CL_READ_HDR)
|
||||
{
|
||||
if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
|
||||
return handle_reply_hdr(cl->read_op->req.buf, cl);
|
||||
return handle_reply_hdr(cl);
|
||||
else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC)
|
||||
handle_op_hdr(cl);
|
||||
else
|
||||
@@ -286,7 +286,7 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
|
||||
}
|
||||
}
|
||||
|
||||
bool osd_messenger_t::handle_reply_hdr(void *reply_hdr, osd_client_t *cl)
|
||||
bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
|
||||
{
|
||||
auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id);
|
||||
if (req_it == cl->sent_ops.end())
|
||||
@@ -297,7 +297,7 @@ bool osd_messenger_t::handle_reply_hdr(void *reply_hdr, osd_client_t *cl)
|
||||
return false;
|
||||
}
|
||||
osd_op_t *op = req_it->second;
|
||||
memcpy(op->reply.buf, reply_hdr, OSD_PACKET_SIZE);
|
||||
memcpy(op->reply.buf, cl->read_op->req.buf, OSD_PACKET_SIZE);
|
||||
cl->sent_ops.erase(req_it);
|
||||
if (op->reply.hdr.opcode == OSD_OP_SEC_READ || op->reply.hdr.opcode == OSD_OP_READ)
|
||||
{
|
||||
@@ -328,16 +328,14 @@ bool osd_messenger_t::handle_reply_hdr(void *reply_hdr, osd_client_t *cl)
|
||||
{
|
||||
goto reuse;
|
||||
}
|
||||
if (cl->read_op)
|
||||
delete cl->read_op;
|
||||
delete cl->read_op;
|
||||
cl->read_op = op;
|
||||
cl->read_state = CL_READ_REPLY_DATA;
|
||||
}
|
||||
else if (op->reply.hdr.opcode == OSD_OP_SEC_LIST && op->reply.hdr.retval > 0)
|
||||
{
|
||||
assert(!op->iov.count);
|
||||
if (cl->read_op)
|
||||
delete cl->read_op;
|
||||
delete cl->read_op;
|
||||
cl->read_op = op;
|
||||
cl->read_state = CL_READ_REPLY_DATA;
|
||||
cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval;
|
||||
@@ -347,8 +345,7 @@ bool osd_messenger_t::handle_reply_hdr(void *reply_hdr, osd_client_t *cl)
|
||||
else if (op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP && op->reply.hdr.retval > 0)
|
||||
{
|
||||
assert(!op->iov.count);
|
||||
if (cl->read_op)
|
||||
delete cl->read_op;
|
||||
delete cl->read_op;
|
||||
cl->read_op = op;
|
||||
cl->read_state = CL_READ_REPLY_DATA;
|
||||
cl->read_remaining = op->reply.hdr.retval;
|
||||
@@ -358,8 +355,7 @@ bool osd_messenger_t::handle_reply_hdr(void *reply_hdr, osd_client_t *cl)
|
||||
}
|
||||
else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0)
|
||||
{
|
||||
if (cl->read_op)
|
||||
delete cl->read_op;
|
||||
delete cl->read_op;
|
||||
cl->read_op = op;
|
||||
cl->read_state = CL_READ_REPLY_DATA;
|
||||
cl->read_remaining = op->reply.hdr.retval;
|
||||
@@ -372,8 +368,7 @@ bool osd_messenger_t::handle_reply_hdr(void *reply_hdr, osd_client_t *cl)
|
||||
reuse:
|
||||
// It's fine to reuse cl->read_op for the next reply
|
||||
handle_reply_ready(op);
|
||||
if (cl->read_op)
|
||||
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
|
||||
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
|
||||
cl->read_remaining = OSD_PACKET_SIZE;
|
||||
cl->read_state = CL_READ_HDR;
|
||||
}
|
||||
|
@@ -96,7 +96,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
|
||||
to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len });
|
||||
to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 });
|
||||
}
|
||||
to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_LAST;
|
||||
if (cur_op->op_type == OSD_OP_IN)
|
||||
{
|
||||
to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_FREE;
|
||||
|
@@ -129,7 +129,6 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
|
||||
#ifdef WITH_RDMA
|
||||
if (cl->rdma_conn)
|
||||
{
|
||||
clients_by_qp.erase(cl->rdma_conn->qp->qp_num);
|
||||
delete cl->rdma_conn;
|
||||
}
|
||||
#endif
|
||||
|
@@ -759,7 +759,18 @@ static void calc_rmw_parity_copy_mod(osd_rmw_stripe_t *stripes, int pg_size, int
|
||||
uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size, uint32_t bitmap_granularity,
|
||||
uint32_t &start, uint32_t &end)
|
||||
{
|
||||
if (write_osd_set[pg_minsize] != 0 || write_osd_set != read_osd_set)
|
||||
bool required = false;
|
||||
for (int role = pg_minsize; role < pg_size; role++)
|
||||
{
|
||||
if (write_osd_set[role] != 0)
|
||||
{
|
||||
// Whole parity chunk is needed when we move the object
|
||||
if (write_osd_set[role] != read_osd_set[role])
|
||||
end = chunk_size;
|
||||
required = true;
|
||||
}
|
||||
}
|
||||
if (required && end != chunk_size)
|
||||
{
|
||||
// start & end are required for calc_rmw_parity
|
||||
for (int role = 0; role < pg_minsize; role++)
|
||||
@@ -770,14 +781,6 @@ static void calc_rmw_parity_copy_mod(osd_rmw_stripe_t *stripes, int pg_size, int
|
||||
end = std::max(stripes[role].req_end, end);
|
||||
}
|
||||
}
|
||||
for (int role = pg_minsize; role < pg_size; role++)
|
||||
{
|
||||
if (write_osd_set[role] != 0 && write_osd_set[role] != read_osd_set[role])
|
||||
{
|
||||
start = 0;
|
||||
end = chunk_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Set bitmap bits accordingly
|
||||
if (bitmap_granularity > 0)
|
||||
|
@@ -24,7 +24,7 @@ void test11();
|
||||
void test12();
|
||||
void test13();
|
||||
void test14();
|
||||
void test15();
|
||||
void test15(bool second);
|
||||
void test16();
|
||||
|
||||
int main(int narg, char *args[])
|
||||
@@ -54,7 +54,8 @@ int main(int narg, char *args[])
|
||||
// Test 14
|
||||
test14();
|
||||
// Test 15
|
||||
test15();
|
||||
test15(false);
|
||||
test15(true);
|
||||
// Test 16
|
||||
test16();
|
||||
// End
|
||||
@@ -826,12 +827,11 @@ void test14()
|
||||
|
||||
***/
|
||||
|
||||
void test15()
|
||||
void test15(bool second)
|
||||
{
|
||||
const int bmp = 64*1024 / 4096 / 8;
|
||||
use_ec(4, 2, true);
|
||||
osd_num_t osd_set[4] = { 1, 2, 3, 0 };
|
||||
osd_num_t write_osd_set[4] = { 1, 2, 3, 0 };
|
||||
osd_num_t osd_set[4] = { 1, 2, (osd_num_t)(second ? 0 : 3), (osd_num_t)(second ? 4 : 0) };
|
||||
osd_rmw_stripe_t stripes[4] = {};
|
||||
unsigned bitmaps[4] = { 0 };
|
||||
// Test 15.0
|
||||
@@ -842,7 +842,7 @@ void test15()
|
||||
assert(stripes[2].req_start == 0 && stripes[2].req_end == 0);
|
||||
assert(stripes[3].req_start == 0 && stripes[3].req_end == 0);
|
||||
// Test 15.1
|
||||
void *rmw_buf = calc_rmw(write_buf, stripes, osd_set, 4, 2, 3, write_osd_set, 64*1024, bmp);
|
||||
void *rmw_buf = calc_rmw(write_buf, stripes, osd_set, 4, 2, 3, osd_set, 64*1024, bmp);
|
||||
for (int i = 0; i < 4; i++)
|
||||
stripes[i].bmp_buf = bitmaps+i;
|
||||
assert(rmw_buf);
|
||||
@@ -852,32 +852,34 @@ void test15()
|
||||
assert(stripes[3].read_start == 0 && stripes[3].read_end == 0);
|
||||
assert(stripes[0].write_start == 0 && stripes[0].write_end == 0);
|
||||
assert(stripes[1].write_start == 28*1024 && stripes[1].write_end == 32*1024);
|
||||
assert(stripes[2].write_start == 28*1024 && stripes[2].write_end == 32*1024);
|
||||
assert(stripes[3].write_start == 0 && stripes[3].write_end == 0);
|
||||
assert(stripes[2+second].write_start == 28*1024 && stripes[2+second].write_end == 32*1024);
|
||||
assert(stripes[3-second].write_start == 0 && stripes[3-second].write_end == 0);
|
||||
assert(stripes[0].read_buf == (uint8_t*)rmw_buf+4*1024);
|
||||
assert(stripes[1].read_buf == NULL);
|
||||
assert(stripes[2].read_buf == NULL);
|
||||
assert(stripes[3].read_buf == NULL);
|
||||
assert(stripes[0].write_buf == NULL);
|
||||
assert(stripes[1].write_buf == (uint8_t*)write_buf);
|
||||
assert(stripes[2].write_buf == rmw_buf);
|
||||
assert(stripes[3].write_buf == NULL);
|
||||
assert(stripes[2+second].write_buf == rmw_buf);
|
||||
assert(stripes[3-second].write_buf == NULL);
|
||||
// Test 15.2 - encode
|
||||
set_pattern(write_buf, 4*1024, PATTERN1);
|
||||
set_pattern(stripes[0].read_buf, 4*1024, PATTERN2);
|
||||
memset(stripes[0].bmp_buf, 0, bmp);
|
||||
memset(stripes[1].bmp_buf, 0, bmp);
|
||||
calc_rmw_parity_ec(stripes, 4, 2, osd_set, write_osd_set, 64*1024, bmp);
|
||||
assert(*(uint32_t*)stripes[2].bmp_buf == 0x80);
|
||||
memset(stripes[2+second].write_buf, 0, 4096);
|
||||
calc_rmw_parity_ec(stripes, 4, 2, osd_set, osd_set, 64*1024, bmp);
|
||||
assert(second || *(uint32_t*)stripes[2].bmp_buf == 0x80);
|
||||
assert(stripes[0].write_start == 0 && stripes[0].write_end == 0);
|
||||
assert(stripes[1].write_start == 28*1024 && stripes[1].write_end == 32*1024);
|
||||
assert(stripes[2].write_start == 28*1024 && stripes[2].write_end == 32*1024);
|
||||
assert(stripes[3].write_start == 0 && stripes[3].write_end == 0);
|
||||
assert(stripes[2+second].write_start == 28*1024 && stripes[2+second].write_end == 32*1024);
|
||||
assert(stripes[3-second].write_start == 0 && stripes[3-second].write_end == 0);
|
||||
assert(stripes[0].write_buf == NULL);
|
||||
assert(stripes[1].write_buf == (uint8_t*)write_buf);
|
||||
assert(stripes[2].write_buf == rmw_buf);
|
||||
assert(stripes[3].write_buf == NULL);
|
||||
check_pattern(stripes[2].write_buf, 4*1024, PATTERN1^PATTERN2); // first parity is always xor :)
|
||||
assert(stripes[2+second].write_buf == rmw_buf);
|
||||
assert(stripes[3-second].write_buf == NULL);
|
||||
// first parity is always xor :), second isn't...
|
||||
check_pattern(stripes[2+second].write_buf, 4*1024, second ? 0xb79a59a0ce8b9b81 : PATTERN1^PATTERN2);
|
||||
// Done
|
||||
free(rmw_buf);
|
||||
free(write_buf);
|
||||
|
@@ -166,20 +166,15 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
||||
{
|
||||
// Indicate that RDMA is enabled
|
||||
wire_config["rdma_enabled"] = true;
|
||||
if (req_json["connect_rdma"].is_string() && req_json["rdma_op_memory"].uint64_value() != 0)
|
||||
if (req_json["connect_rdma"].is_string())
|
||||
{
|
||||
// Peer is trying to connect using RDMA, try to satisfy him
|
||||
bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value(),
|
||||
req_json["rdma_data_rkey"].uint64_value(), req_json["rdma_op_rkey"].uint64_value(),
|
||||
req_json["rdma_op_slots"].uint64_value(), req_json["rdma_op_memory"].uint64_value());
|
||||
bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value(), req_json["rdma_max_msg"].uint64_value());
|
||||
if (ok)
|
||||
{
|
||||
auto rc = msgr.clients.at(cur_op->peer_fd)->rdma_conn;
|
||||
wire_config["rdma_address"] = rc->addr.to_string();
|
||||
wire_config["rdma_data_rkey"] = (uint64_t)rc->in_data_mr->rkey;
|
||||
wire_config["rdma_op_rkey"] = (uint64_t)rc->in_op_mr->rkey;
|
||||
wire_config["rdma_op_slots"] = rc->op_slots;
|
||||
wire_config["rdma_op_memory"] = rc->op_memory;
|
||||
wire_config["rdma_max_msg"] = rc->max_msg;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,64 +0,0 @@
|
||||
// Copyright (c) Vitaliy Filippov, 2023+
|
||||
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdexcept>
|
||||
#include "freelist.cpp"
|
||||
|
||||
inline bool operator == (const freelist_item_t & a, const freelist_item_t & b)
|
||||
{
|
||||
return a.start == b.start && a.size == b.size;
|
||||
}
|
||||
|
||||
void dump(std::vector<freelist_item_t> & freelist)
|
||||
{
|
||||
printf("free: ");
|
||||
for (auto & item: freelist)
|
||||
{
|
||||
printf("%lx+%lx ", item.start, item.size);
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
void dump(freelist_allocator_t &alloc)
|
||||
{
|
||||
dump(alloc.freelist);
|
||||
}
|
||||
|
||||
uint64_t test_alloc(freelist_allocator_t &alloc, uint64_t size)
|
||||
{
|
||||
uint64_t r = alloc.alloc(size);
|
||||
printf("alloc %lx: %lx\n", size, r);
|
||||
return r;
|
||||
}
|
||||
|
||||
void assert_eq(freelist_allocator_t &alloc, std::vector<freelist_item_t> v)
|
||||
{
|
||||
if (alloc.freelist != v)
|
||||
{
|
||||
printf("expected ");
|
||||
dump(v);
|
||||
printf("got ");
|
||||
dump(alloc);
|
||||
throw std::runtime_error("test failed");
|
||||
}
|
||||
dump(alloc);
|
||||
}
|
||||
|
||||
int main(int narg, char *args[])
|
||||
{
|
||||
freelist_allocator_t alloc;
|
||||
alloc.free(0, 0x1000000);
|
||||
assert_eq(alloc, { { 0, 0x1000000 } });
|
||||
assert(test_alloc(alloc, 0x1000) == 0);
|
||||
assert_eq(alloc, { { 0x1000, 0xfff000 } });
|
||||
assert(test_alloc(alloc, 0x4000) == 0x1000);
|
||||
alloc.free(0x1000000, 0x4000);
|
||||
assert_eq(alloc, { { 0x5000, 0xfff000 } });
|
||||
alloc.free(0, 0x1000);
|
||||
assert_eq(alloc, { { 0, 0x1000 }, { 0x5000, 0xfff000 } });
|
||||
alloc.free(0x1000, 0x4000);
|
||||
assert_eq(alloc, { { 0, 0x1004000 } });
|
||||
return 0;
|
||||
}
|
@@ -6,7 +6,7 @@ includedir=${prefix}/@CMAKE_INSTALL_INCLUDEDIR@
|
||||
|
||||
Name: Vitastor
|
||||
Description: Vitastor client library
|
||||
Version: 0.8.5
|
||||
Version: 0.8.6
|
||||
Libs: -L${libdir} -lvitastor_client
|
||||
Cflags: -I${includedir}
|
||||
|
||||
|
Reference in New Issue
Block a user