Compare commits

..

5 Commits

Author SHA1 Message Date
Vitaliy Filippov 109f51a015 Implement basic VitastorFS support in CSI
Test / test_root_node (push) Successful in 10s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 1m43s Details
Test / test_write_no_same (push) Successful in 11s Details
Test / test_write (push) Successful in 32s Details
Test / test_switch_primary (push) Successful in 36s Details
Test / test_write_xor (push) Successful in 36s Details
Test / test_heal_pg_size_2 (push) Successful in 2m16s Details
Test / test_heal_ec (push) Successful in 2m17s Details
Test / test_heal_antietcd (push) Successful in 2m18s Details
Test / test_heal_csum_32k_dmj (push) Successful in 2m19s Details
Test / test_heal_csum_32k_dj (push) Successful in 2m22s Details
Test / test_heal_csum_32k (push) Successful in 2m17s Details
Test / test_heal_csum_4k_dmj (push) Successful in 2m21s Details
Test / test_heal_csum_4k_dj (push) Successful in 2m21s Details
Test / test_resize (push) Successful in 15s Details
Test / test_resize_auto (push) Successful in 9s Details
Test / test_osd_tags (push) Successful in 10s Details
Test / test_snapshot_pool2 (push) Successful in 17s Details
Test / test_enospc (push) Successful in 13s Details
Test / test_enospc_imm (push) Successful in 12s Details
Test / test_enospc_xor (push) Successful in 15s Details
Test / test_enospc_imm_xor (push) Successful in 14s Details
Test / test_scrub (push) Successful in 15s Details
Test / test_scrub_zero_osd_2 (push) Successful in 14s Details
Test / test_scrub_xor (push) Successful in 15s Details
Test / test_scrub_pg_size_3 (push) Successful in 17s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 18s Details
Test / test_scrub_ec (push) Successful in 17s Details
Test / test_nfs (push) Successful in 13s Details
Test / test_heal_csum_4k (push) Successful in 2m8s Details
2024-12-17 02:26:23 +03:00
Vitaliy Filippov 8a86c123c3 Allow to auto-select and print the port
Test / test_root_node (push) Successful in 11s Details
Test / test_rebalance_verify_ec_imm (push) Successful in 1m51s Details
Test / test_write_no_same (push) Successful in 9s Details
Test / test_write (push) Successful in 32s Details
Test / test_switch_primary (push) Successful in 36s Details
Test / test_write_xor (push) Successful in 35s Details
Test / test_heal_pg_size_2 (push) Successful in 2m16s Details
Test / test_heal_ec (push) Successful in 2m18s Details
Test / test_heal_antietcd (push) Successful in 2m17s Details
Test / test_heal_csum_32k_dmj (push) Successful in 2m19s Details
Test / test_heal_csum_32k_dj (push) Successful in 2m21s Details
Test / test_heal_csum_32k (push) Successful in 2m20s Details
Test / test_heal_csum_4k_dmj (push) Successful in 2m21s Details
Test / test_heal_csum_4k_dj (push) Successful in 2m20s Details
Test / test_resize_auto (push) Successful in 10s Details
Test / test_resize (push) Successful in 17s Details
Test / test_snapshot_pool2 (push) Successful in 16s Details
Test / test_osd_tags (push) Successful in 9s Details
Test / test_enospc (push) Successful in 11s Details
Test / test_enospc_xor (push) Successful in 14s Details
Test / test_enospc_imm (push) Successful in 12s Details
Test / test_enospc_imm_xor (push) Successful in 14s Details
Test / test_scrub (push) Successful in 15s Details
Test / test_scrub_zero_osd_2 (push) Successful in 15s Details
Test / test_scrub_xor (push) Successful in 16s Details
Test / test_scrub_pg_size_3 (push) Successful in 17s Details
Test / test_scrub_pg_size_6_pg_minsize_4_osd_count_6_ec (push) Successful in 18s Details
Test / test_scrub_ec (push) Successful in 15s Details
Test / test_nfs (push) Successful in 13s Details
Test / test_heal_csum_4k (push) Successful in 2m10s Details
2024-12-14 16:55:13 +03:00
Vitaliy Filippov b856524e0c Workaround for Linux bug: return post_op_attr for NFS-RDMA READ3
Linux NFS RDMA transport has a stupid bug - when the reply doesn't contain
post_op_attr, the data gets offsetted by 84 bytes (size of attributes) and
first 84 bytes are filled with probably random data.
2024-12-11 21:09:36 +03:00
Vitaliy Filippov ae3ca7451f Use per-connection RDMA device contexts 2024-12-11 21:09:36 +03:00
Vitaliy Filippov 1dbbb0c3f8 Implement NFS RDMA support 2024-12-11 21:09:36 +03:00
35 changed files with 2891 additions and 243 deletions

View File

@ -22,6 +22,8 @@ RUN apt-get update && \
(echo "APT::Install-Recommends false;" > /etc/apt/apt.conf) && \
apt-get update && \
apt-get install -y e2fsprogs xfsprogs kmod iproute2 \
# NFS mount dependencies
nfs-common netbase \
# dependencies of qemu-storage-daemon
libnuma1 liburing2 libglib2.0-0 libfuse3-3 libaio1 libzstd1 libnettle8 \
libgmp10 libhogweed6 libp11-kit0 libidn2-0 libunistring2 libtasn1-6 libpcre2-8-0 libffi8 && \

View File

@ -9,8 +9,16 @@ metadata:
provisioner: csi.vitastor.io
volumeBindingMode: Immediate
parameters:
etcdVolumePrefix: ""
poolId: "1"
# CSI driver can create block-based volumes and VitastorFS-based volumes
# only VitastorFS-based volumes and raw block volumes (without FS) support ReadWriteMany mode
# set this parameter to VitastorFS metadata volume name to use VitastorFS
# if unset, block-based volumes will be created
vitastorfs: ""
# for block-based storage classes, pool ID may be either a string (name) or a number (ID)
# for vitastorFS-based storage classes it must be a string - name of the default pool for FS data
poolId: "testpool"
# volume name prefix for block-based storage classes or NFS subdirectory (including /) for FS-based volumes
volumePrefix: ""
# you can choose other configuration file if you have it in the config map
# different etcd URLs and prefixes should also be put in the config
#configPath: "/etc/vitastor/vitastor.conf"

View File

@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"
"strings"
"strconv"
"time"
"os"
"io/ioutil"
@ -68,9 +67,10 @@ func GetConnectionParams(params map[string]string) (map[string]string, error)
{
configPath = "/etc/vitastor/vitastor.conf"
}
else
ctxVars["configPath"] = configPath
if (params["vitastorfs"] != "")
{
ctxVars["configPath"] = configPath
ctxVars["vitastorfs"] = params["vitastorfs"]
}
config := make(map[string]interface{})
configFD, err := os.Open(configPath)
@ -140,33 +140,57 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, "volume capabilities is a required field")
}
err := cs.checkCaps(volumeCapabilities)
if (err != nil)
{
return nil, err
}
etcdVolumePrefix := req.Parameters["etcdVolumePrefix"]
poolId, _ := strconv.ParseUint(req.Parameters["poolId"], 10, 64)
if (poolId == 0)
{
return nil, status.Error(codes.InvalidArgument, "poolId is missing in storage class configuration")
}
volName := etcdVolumePrefix + req.GetName()
volSize := 1 * GB
if capRange := req.GetCapacityRange(); capRange != nil
{
volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB
}
ctxVars, err := GetConnectionParams(req.Parameters)
if (err != nil)
{
return nil, err
}
args := []string{ "create", volName, "-s", fmt.Sprintf("%v", volSize), "--pool", fmt.Sprintf("%v", poolId) }
err = cs.checkCaps(volumeCapabilities, ctxVars["vitastorfs"] != "")
if (err != nil)
{
return nil, err
}
pool := req.Parameters["poolId"]
if (pool == "")
{
return nil, status.Error(codes.InvalidArgument, "poolId is missing in storage class configuration")
}
volumePrefix := req.Parameters["volumePrefix"]
if (volumePrefix == "")
{
// Old name
volumePrefix = req.Parameters["etcdVolumePrefix"]
}
volName := volumePrefix + req.GetName()
volSize := 1 * GB
if capRange := req.GetCapacityRange(); capRange != nil
{
volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB
}
if (ctxVars["vitastorfs"] != "")
{
// Nothing to create, subdirectories are created during mounting
// FIXME: It would be cool to support quotas some day and set it here
if (req.VolumeContentSource.GetSnapshot() != nil)
{
return nil, status.Error(codes.InvalidArgument, "VitastorFS doesn't support snapshots")
}
ctxVars["name"] = volName
ctxVars["pool"] = pool
volumeIdJson, _ := json.Marshal(ctxVars)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
// Ugly, but VolumeContext isn't passed to DeleteVolume :-(
VolumeId: string(volumeIdJson),
CapacityBytes: volSize,
},
}, nil
}
args := []string{ "create", volName, "-s", fmt.Sprintf("%v", volSize), "--pool", pool }
// Support creation from snapshot
var src *csi.VolumeContentSource
@ -249,6 +273,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, err
}
if (ctxVars["vitastorfs"] != "")
{
// FIXME: Delete FS subdirectory
return &csi.DeleteVolumeResponse{}, nil
}
_, err = invokeCLI(ctxVars, []string{ "rm", volName })
if (err != nil)
{
@ -283,13 +313,25 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
{
return nil, status.Error(codes.InvalidArgument, "volumeId is nil")
}
volVars := make(map[string]string)
err := json.Unmarshal([]byte(volumeID), &volVars)
if (err != nil)
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
ctxVars, err := GetConnectionParams(volVars)
if (err != nil)
{
return nil, err
}
volumeCapabilities := req.GetVolumeCapabilities()
if (volumeCapabilities == nil)
{
return nil, status.Error(codes.InvalidArgument, "volumeCapabilities is nil")
}
err := cs.checkCaps(volumeCapabilities)
err = cs.checkCaps(volumeCapabilities, ctxVars["vitastorfs"] != "")
if (err != nil)
{
return nil, err
@ -302,7 +344,7 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
}, nil
}
func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability) error
func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability, fs bool) error
{
var volumeCapabilityAccessModes []*csi.VolumeCapability_AccessMode
for _, mode := range []csi.VolumeCapability_AccessMode_Mode{
@ -318,6 +360,10 @@ func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability
{
if (capability.GetBlock() != nil)
{
if (fs)
{
return status.Errorf(codes.InvalidArgument, "%v not supported with FS-based volumes", capability)
}
for _, mode := range []csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER,
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
@ -328,6 +374,12 @@ func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability
}
}
if (fs)
{
// All access modes including RWX are supported with FS-based volumes
return nil
}
capabilitySupport := false
for _, capability := range volumeCapabilities
{
@ -342,7 +394,7 @@ func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability
if (!capabilitySupport)
{
return status.Errorf(codes.NotFound, "%v not supported", volumeCapabilities)
return status.Errorf(codes.InvalidArgument, "%v not supported", volumeCapabilities)
}
return nil
@ -434,6 +486,12 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
{
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
}
if (ctxVars["vitastorfs"] != "")
{
return nil, status.Error(codes.InvalidArgument, "VitastorFS doesn't support snapshots")
}
volName := ctxVars["name"]
// Create image using vitastor-cli
@ -492,6 +550,11 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
return nil, err
}
if (ctxVars["vitastorfs"] != "")
{
return nil, status.Error(codes.InvalidArgument, "VitastorFS doesn't support snapshots")
}
_, err = invokeCLI(ctxVars, []string{ "rm", volName+"@"+snapName })
if (err != nil)
{
@ -523,6 +586,11 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
return nil, err
}
if (ctxVars["vitastorfs"] != "")
{
return nil, status.Error(codes.InvalidArgument, "VitastorFS doesn't support snapshots")
}
inodeCfg, err := invokeList(ctxVars, volName+"@*", false)
if (err != nil)
{
@ -586,6 +654,16 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
return nil, err
}
if (ctxVars["vitastorfs"] != "")
{
// Nothing to change
// FIXME: Support quotas and change quota here
return &csi.ControllerExpandVolumeResponse{
CapacityBytes: req.CapacityRange.RequiredBytes,
NodeExpansionRequired: false,
}, nil
}
inodeCfg, err := invokeList(ctxVars, volName, true)
if (err != nil)
{

View File

@ -5,11 +5,15 @@ package vitastor
import (
"context"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
@ -29,13 +33,14 @@ import (
type NodeServer struct
{
*Driver
useVduse bool
stateDir string
mounter mount.Interface
useVduse bool
stateDir string
nfsStageDir string
mounter mount.Interface
restartInterval time.Duration
mu sync.Mutex
cond *sync.Cond
volumeLocks map[string]bool
mu sync.Mutex
cond *sync.Cond
volumeLocks map[string]bool
}
type DeviceState struct
@ -48,6 +53,15 @@ type DeviceState struct
PidFile string `json:"pidFile"`
}
type NfsState struct
{
ConfigPath string `json:"configPath"`
FsName string `json:"fsName"`
Pool string `json:"pool"`
Path string `json:"path"`
Port int `json:"port"`
}
// NewNodeServer create new instance node
func NewNodeServer(driver *Driver) *NodeServer
{
@ -60,11 +74,17 @@ func NewNodeServer(driver *Driver) *NodeServer
{
stateDir += "/"
}
nfsStageDir := os.Getenv("NFS_STAGE_DIR")
if (nfsStageDir == "")
{
nfsStageDir = "/var/lib/kubelet/plugins/csi.vitastor.io/nfs"
}
ns := &NodeServer{
Driver: driver,
useVduse: checkVduseSupport(),
stateDir: stateDir,
mounter: mount.New(""),
Driver: driver,
useVduse: checkVduseSupport(),
stateDir: stateDir,
nfsStageDir: nfsStageDir,
mounter: mount.New(""),
volumeLocks: make(map[string]bool),
}
ns.cond = sync.NewCond(&ns.mu)
@ -123,12 +143,12 @@ func (ns *NodeServer) restarter()
func (ns *NodeServer) restoreVduseDaemons()
{
pattern := ns.stateDir+"vitastor-vduse-*.json"
matches, err := filepath.Glob(pattern)
stateFiles, err := filepath.Glob(pattern)
if (err != nil)
{
klog.Errorf("failed to list %s: %v", pattern, err)
}
if (len(matches) == 0)
if (len(stateFiles) == 0)
{
return
}
@ -146,59 +166,162 @@ func (ns *NodeServer) restoreVduseDaemons()
klog.Errorf("/sbin/vdpa -j dev list returned bad JSON (error %v): %v", err, string(devListJSON))
return
}
for _, stateFile := range matches
for _, stateFile := range stateFiles
{
vdpaId := filepath.Base(stateFile)
vdpaId = vdpaId[0:len(vdpaId)-5]
// Check if VDPA device is still added to the bus
if (devs[vdpaId] == nil)
{
// Unused, clean it up
unmapVduseById(ns.stateDir, vdpaId)
continue
}
ns.checkVduseState(stateFile, devs)
}
}
stateJSON, err := os.ReadFile(stateFile)
func (ns *NodeServer) checkVduseState(stateFile string, devs map[string]interface{})
{
// Check if VDPA device is still added to the bus
vdpaId := filepath.Base(stateFile)
vdpaId = vdpaId[0:len(vdpaId)-5]
if (devs[vdpaId] == nil)
{
// Unused, clean it up
unmapVduseById(ns.stateDir, vdpaId)
return
}
// Read state file
stateJSON, err := os.ReadFile(stateFile)
if (err != nil)
{
klog.Warningf("error reading state file %v: %v", stateFile, err)
return
}
var state DeviceState
err = json.Unmarshal(stateJSON, &state)
if (err != nil)
{
klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON))
return
}
// Lock volume
ns.lockVolume(state.ConfigPath+":block:"+state.Image)
defer ns.unlockVolume(state.ConfigPath+":block:"+state.Image)
// Recheck state file after locking
_, err = os.ReadFile(stateFile)
if (err != nil)
{
klog.Warningf("state file %v disappeared, skipping volume", stateFile)
return
}
// Check if the storage daemon is still active
pidFile := ns.stateDir + vdpaId + ".pid"
exists := false
proc, err := findByPidFile(pidFile)
if (err == nil)
{
exists = proc.Signal(syscall.Signal(0)) == nil
}
if (!exists)
{
// Restart daemon
klog.Warningf("restarting storage daemon for volume %v (VDPA ID %v)", state.Image, vdpaId)
err = startStorageDaemon(vdpaId, state.Image, pidFile, state.ConfigPath, state.Readonly)
if (err != nil)
{
klog.Warningf("error reading state file %v: %v", stateFile, err)
continue
klog.Warningf("failed to restart storage daemon for volume %v: %v", state.Image, err)
}
var state DeviceState
err = json.Unmarshal(stateJSON, &state)
}
}
func (ns *NodeServer) restoreNfsDaemons()
{
pattern := ns.stateDir+"vitastor-nfs-*.json"
stateFiles, err := filepath.Glob(pattern)
if (err != nil)
{
klog.Errorf("failed to list %s: %v", pattern, err)
}
if (len(stateFiles) == 0)
{
return
}
activeNFS, err := ns.listActiveNFS()
if (err != nil)
{
return
}
// Check all state files and try to restore active mounts
for _, stateFile := range stateFiles
{
ns.checkNfsState(stateFile, activeNFS)
}
}
func (ns *NodeServer) readNfsState(stateFile string, allowNotExists bool) (*NfsState, error)
{
stateJSON, err := os.ReadFile(stateFile)
if (err != nil)
{
if (allowNotExists && os.IsNotExist(err))
{
return nil, nil
}
klog.Warningf("error reading state file %v: %v", stateFile, err)
return nil, err
}
var state NfsState
err = json.Unmarshal(stateJSON, &state)
if (err != nil)
{
klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON))
return nil, err
}
return &state, nil
}
func (ns *NodeServer) checkNfsState(stateFile string, activeNfs map[int][]string)
{
// Read state file
state, err := ns.readNfsState(stateFile, false)
if (err != nil)
{
return
}
// Lock FS
ns.lockVolume(state.ConfigPath+":fs:"+state.FsName)
defer ns.unlockVolume(state.ConfigPath+":fs:"+state.FsName)
// Check if NFS at this port is still mounted
pidFile := ns.stateDir + filepath.Base(stateFile)
pidFile = pidFile[0:len(pidFile)-5] + ".pid"
if (len(activeNfs[state.Port]) == 0)
{
// this is a stale state file, remove it
klog.Warningf("state file %v contains stale mount at port %d, removing it", stateFile, state.Port)
ns.stopNFS(stateFile, pidFile)
return
}
// Check PID file
exists := false
proc, err := findByPidFile(pidFile)
if (err == nil)
{
exists = proc.Signal(syscall.Signal(0)) == nil
}
if (!exists)
{
// Restart vitastor-nfs server
klog.Warningf("restarting NFS server for FS %v at port %v", state.FsName, state.Port)
_, _, err := system(
"/usr/bin/vitastor-nfs", "start",
"--pidfile", pidFile,
"--bind", "127.0.0.1",
"--port", fmt.Sprintf("%d", state.Port),
"--fs", state.FsName,
"--pool", state.Pool,
"--portmap", "0",
)
if (err != nil)
{
klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON))
continue
klog.Warningf("failed to restart NFS server for FS %v: %v", state.FsName, err)
}
ns.lockVolume(state.ConfigPath+":"+state.Image)
// Recheck state file after locking
_, err = os.ReadFile(stateFile)
if (err != nil)
{
klog.Warningf("state file %v disappeared, skipping volume", stateFile)
ns.unlockVolume(state.ConfigPath+":"+state.Image)
continue
}
// Check if the storage daemon is still active
pidFile := ns.stateDir + vdpaId + ".pid"
exists := false
proc, err := findByPidFile(pidFile)
if (err == nil)
{
exists = proc.Signal(syscall.Signal(0)) == nil
}
if (!exists)
{
// Restart daemon
klog.Warningf("restarting storage daemon for volume %v (VDPA ID %v)", state.Image, vdpaId)
_ = startStorageDaemon(vdpaId, state.Image, pidFile, state.ConfigPath, state.Readonly)
}
ns.unlockVolume(state.ConfigPath+":"+state.Image)
}
}
@ -220,8 +343,13 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
volName := ctxVars["name"]
ns.lockVolume(ctxVars["configPath"]+":"+volName)
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
if (ctxVars["vitastorfs"] != "")
{
return &csi.NodeStageVolumeResponse{}, nil
}
ns.lockVolume(ctxVars["configPath"]+":block:"+volName)
defer ns.unlockVolume(ctxVars["configPath"]+":block:"+volName)
targetPath := req.GetStagingTargetPath()
isBlock := req.GetVolumeCapability().GetBlock() != nil
@ -408,8 +536,13 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}
volName := ctxVars["name"]
ns.lockVolume(ctxVars["configPath"]+":"+volName)
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
if (ctxVars["vitastorfs"] != "")
{
return &csi.NodeUnstageVolumeResponse{}, nil
}
ns.lockVolume(ctxVars["configPath"]+":block:"+volName)
defer ns.unlockVolume(ctxVars["configPath"]+":block:"+volName)
targetPath := req.GetStagingTargetPath()
devicePath, _, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath)
@ -462,6 +595,153 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return &csi.NodeUnstageVolumeResponse{}, nil
}
// Mount or check if NFS is already mounted
func (ns *NodeServer) mountNFS(ctxVars map[string]string) (string, error)
{
sum := sha1.Sum([]byte(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"]))
nfsHash := hex.EncodeToString(sum[:])
stateFile := ns.stateDir+"vitastor-nfs-"+nfsHash+".json"
pidFile := ns.stateDir+"vitastor-nfs-"+nfsHash+".pid"
mountPath := ns.nfsStageDir+"/"+nfsHash
state, err := ns.readNfsState(stateFile, true)
if (state != nil)
{
return state.Path, nil
}
if (err != nil)
{
return "", err
}
err = os.MkdirAll(mountPath, 0777)
if (err != nil)
{
return "", err
}
// Create a new mount
state = &NfsState{
ConfigPath: ctxVars["configPath"],
FsName: ctxVars["vitastorfs"],
Pool: ctxVars["pool"],
Path: mountPath,
}
klog.Infof("starting new NFS server for FS %v", state.FsName)
stdout, _, err := system(
"/usr/bin/vitastor-nfs", "start",
"--pidfile", pidFile,
"--bind", "127.0.0.1",
"--port", "auto",
"--fs", state.FsName,
"--pool", state.Pool,
"--portmap", "0",
)
if (err != nil)
{
return "", err
}
match := regexp.MustCompile("Port: (\\d+)").FindStringSubmatch(string(stdout))
if (match == nil)
{
klog.Errorf("failed to find port in vitastor-nfs output: %v", string(stdout))
ns.stopNFS(stateFile, pidFile)
return "", fmt.Errorf("failed to find port in vitastor-nfs output (bad vitastor-nfs version?)")
}
port, _ := strconv.ParseUint(match[1], 0, 16)
state.Port = int(port)
// Write state file
stateJSON, _ := json.Marshal(state)
err = os.WriteFile(stateFile, stateJSON, 0600)
if (err != nil)
{
klog.Errorf("failed to write state file %v", stateFile)
ns.stopNFS(stateFile, pidFile)
return "", err
}
// Mount NFS
_, _, err = system(
"mount", "-t", "nfs", "127.0.0.1:/", state.Path,
"-o", fmt.Sprintf("port=%d,mountport=%d,nfsvers=3,soft,nolock,tcp", port, port),
)
if (err != nil)
{
ns.stopNFS(stateFile, pidFile)
return "", err
}
return state.Path, nil
}
// Mount or check if NFS is already mounted
func (ns *NodeServer) checkStopNFS(ctxVars map[string]string)
{
sum := sha1.Sum([]byte(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"]))
nfsHash := hex.EncodeToString(sum[:])
stateFile := ns.stateDir+"vitastor-nfs-"+nfsHash+".json"
pidFile := ns.stateDir+"vitastor-nfs-"+nfsHash+".pid"
mountPath := ns.nfsStageDir+"/"+nfsHash
state, err := ns.readNfsState(stateFile, true)
if (state == nil)
{
return
}
activeNFS, err := ns.listActiveNFS()
if (err != nil)
{
return
}
if (len(activeNFS[state.Port]) > 0)
{
return
}
// All volume mounts are detached, unmount the root mount and kill the server
err = mount.CleanupMountPoint(mountPath, ns.mounter, false)
if (err != nil)
{
klog.Errorf("failed to unmount %v: %v", mountPath, err)
return
}
ns.stopNFS(stateFile, pidFile)
}
func (ns *NodeServer) stopNFS(stateFile, pidFile string)
{
err := killByPidFile(pidFile)
if (err != nil)
{
klog.Errorf("failed to kill process with pid from %v: %v", pidFile, err)
}
os.Remove(pidFile)
os.Remove(stateFile)
}
func (ns *NodeServer) listActiveNFS() (map[int][]string, error)
{
mounts, err := mount.ParseMountInfo("/proc/self/mountinfo")
if (err != nil)
{
klog.Errorf("failed to list mounts: %v", err)
return nil, err
}
activeNFS := make(map[int][]string)
for _, mount := range mounts
{
// Volume mounts always refer to subpaths
if (mount.FsType == "nfs" && mount.Root != "/")
{
for _, opt := range mount.MountOptions
{
if (strings.HasPrefix(opt, "port="))
{
port64, err := strconv.ParseUint(opt[5:], 10, 16)
if (err == nil)
{
activeNFS[int(port64)] = append(activeNFS[int(port64)], mount.MountPoint)
}
}
}
}
}
return activeNFS, nil
}
// NodePublishVolume mounts the volume mounted to the staging path to the target path
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error)
{
@ -480,28 +760,39 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
volName := ctxVars["name"]
ns.lockVolume(ctxVars["configPath"]+":"+volName)
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
if (ctxVars["vitastorfs"] != "")
{
ns.lockVolume(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"])
defer ns.unlockVolume(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"])
}
else
{
ns.lockVolume(ctxVars["configPath"]+":block:"+volName)
defer ns.unlockVolume(ctxVars["configPath"]+":block:"+volName)
}
stagingTargetPath := req.GetStagingTargetPath()
targetPath := req.GetTargetPath()
isBlock := req.GetVolumeCapability().GetBlock() != nil
// Check that stagingTargetPath is mounted
notmnt, err := mount.IsNotMountPoint(ns.mounter, stagingTargetPath)
if (err != nil)
if (ctxVars["vitastorfs"] == "")
{
klog.Errorf("staging path %v is not mounted: %w", stagingTargetPath, err)
return nil, fmt.Errorf("staging path %v is not mounted: %w", stagingTargetPath, err)
}
else if (notmnt)
{
klog.Errorf("staging path %v is not mounted", stagingTargetPath)
return nil, fmt.Errorf("staging path %v is not mounted", stagingTargetPath)
// Check that stagingTargetPath is mounted
notmnt, err := mount.IsNotMountPoint(ns.mounter, stagingTargetPath)
if (err != nil)
{
klog.Errorf("staging path %v is not mounted: %w", stagingTargetPath, err)
return nil, fmt.Errorf("staging path %v is not mounted: %w", stagingTargetPath, err)
}
else if (notmnt)
{
klog.Errorf("staging path %v is not mounted", stagingTargetPath)
return nil, fmt.Errorf("staging path %v is not mounted", stagingTargetPath)
}
}
// Check that targetPath is not already mounted
notmnt, err = mount.IsNotMountPoint(ns.mounter, targetPath)
notmnt, err := mount.IsNotMountPoint(ns.mounter, targetPath)
if (err != nil)
{
if (os.IsNotExist(err))
@ -542,6 +833,24 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, fmt.Errorf("target path %s is already mounted", targetPath)
}
if (ctxVars["vitastorfs"] != "")
{
nfspath, err := ns.mountNFS(ctxVars)
if (err != nil)
{
ns.checkStopNFS(ctxVars)
return nil, err
}
// volName should include prefix
stagingTargetPath = nfspath+"/"+volName
err = os.MkdirAll(stagingTargetPath, 0777)
if (err != nil && !os.IsExist(err))
{
ns.checkStopNFS(ctxVars)
return nil, err
}
}
execArgs := []string{"--bind", stagingTargetPath, targetPath}
if (req.GetReadonly())
{
@ -553,6 +862,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
out, err := cmd.Output()
if (err != nil)
{
if (ctxVars["vitastorfs"] != "")
{
ns.checkStopNFS(ctxVars)
}
return nil, fmt.Errorf("Error running mount %v: %s", strings.Join(execArgs, " "), out)
}
@ -572,8 +885,16 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}
volName := ctxVars["name"]
ns.lockVolume(ctxVars["configPath"]+":"+volName)
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
if (ctxVars["vitastorfs"] != "")
{
ns.lockVolume(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"])
defer ns.unlockVolume(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"])
}
else
{
ns.lockVolume(ctxVars["configPath"]+":block:"+volName)
defer ns.unlockVolume(ctxVars["configPath"]+":block:"+volName)
}
targetPath := req.GetTargetPath()
devicePath, _, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath)
@ -600,6 +921,11 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, err
}
if (ctxVars["vitastorfs"] != "")
{
ns.checkStopNFS(ctxVars)
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}

View File

@ -61,6 +61,10 @@ pkg_check_modules(ISAL libisal)
if (ISAL_LIBRARIES)
add_definitions(-DWITH_ISAL)
endif (ISAL_LIBRARIES)
pkg_check_modules(RDMACM librdmacm)
if (RDMACM_LIBRARIES)
add_definitions(-DWITH_RDMACM)
endif (RDMACM_LIBRARIES)
add_custom_target(build_tests)
add_custom_target(test

View File

@ -5,6 +5,7 @@ project(vitastor)
# vitastor-nfs
add_executable(vitastor-nfs
nfs_proxy.cpp
nfs_proxy_rdma.cpp
nfs_block.cpp
nfs_kv.cpp
nfs_kv_create.cpp
@ -21,8 +22,10 @@ add_executable(vitastor-nfs
nfs_fsstat.cpp
nfs_mount.cpp
nfs_portmap.cpp
rdma_alloc.cpp
../util/sha256.c
proto/xdr_impl.cpp
proto/rpc_rdma_xdr.cpp
proto/rpc_xdr.cpp
proto/portmap_xdr.cpp
proto/nfs_xdr.cpp
@ -30,4 +33,5 @@ add_executable(vitastor-nfs
target_link_libraries(vitastor-nfs
vitastor_client
vitastor_kv
${RDMACM_LIBRARIES}
)

View File

@ -315,8 +315,7 @@ static int block_nfs3_read_proc(void *opaque, rpc_op_t *rop)
if (aligned_count % alignment)
aligned_count = aligned_count + alignment - (aligned_count % alignment);
aligned_count -= aligned_offset;
void *buf = malloc_or_die(aligned_count);
xdr_add_malloc(rop->xdrs, buf);
void *buf = self->malloc_or_rdma(rop, aligned_count);
cluster_op_t *op = new cluster_op_t;
op->opcode = OSD_OP_READ;
op->inode = ino_it->second;
@ -335,10 +334,22 @@ static int block_nfs3_read_proc(void *opaque, rpc_op_t *rop)
}
else
{
nfs_client_t *self = (nfs_client_t*)rop->client;
auto & reply_ok = reply->resok;
// reply_ok.data.data is already set above
reply_ok.count = reply_ok.data.size;
reply_ok.eof = 0;
if (self->rdma_conn)
{
// FIXME Linux NFS RDMA transport has a bug - when the reply
// doesn't contain post_op_attr, the data gets offsetted by
// 84 bytes (size of attributes)...
// So we have to fill it with RDMA. :-(
reply_ok.file_attributes = (post_op_attr){
.attributes_follow = 1,
.attributes = get_file_attributes(self, op->inode),
};
}
}
rpc_queue_reply(rop);
delete op;

View File

@ -65,7 +65,7 @@ int kv_map_type(const std::string & type)
(type == "fifo" ? NF3FIFO : -1)))))));
}
fattr3 get_kv_attributes(nfs_client_t *self, uint64_t ino, json11::Json attrs)
fattr3 get_kv_attributes(nfs_proxy_t *proxy, uint64_t ino, json11::Json attrs)
{
auto type = kv_map_type(attrs["type"].string_value());
auto mode = attrs["mode"].uint64_value();
@ -86,7 +86,7 @@ fattr3 get_kv_attributes(nfs_client_t *self, uint64_t ino, json11::Json attrs)
.rdev = (type == NF3BLK || type == NF3CHR
? (specdata3){ (uint32_t)attrs["major"].uint64_value(), (uint32_t)attrs["minor"].uint64_value() }
: (specdata3){}),
.fsid = self->parent->fsid,
.fsid = proxy->fsid,
.fileid = ino,
.atime = atime,
.mtime = mtime,
@ -349,6 +349,27 @@ kv_fs_state_t::~kv_fs_state_t()
}
}
void kv_fs_state_t::write_inode(inode_t ino, json11::Json value, bool hack_cache, std::function<void(int)> cb, std::function<bool(int, const std::string &)> cas_cb)
{
if (!proxy->rdma_context)
{
proxy->db->set(kv_inode_key(ino), value.dump(), cb, cas_cb);
return;
}
// FIXME Linux NFS RDMA transport has a bug - it corrupts the data (by offsetting it 84 bytes)
// when the READ reply doesn't contain post_op_attr. So we have to fill post_op_attr with RDMA. :-(
// So we at least cache it to not repeat K/V requests every read.
read_hack_cache.erase(ino);
proxy->db->set(kv_inode_key(ino), value.dump(), [=](int res)
{
if (hack_cache || res != 0)
read_hack_cache.erase(ino);
else
read_hack_cache[ino] = value;
cb(res);
}, cas_cb);
}
void kv_fs_state_t::update_inode(inode_t ino, bool allow_cache, std::function<void(json11::Json::object &)> change, std::function<void(int)> cb)
{
// FIXME: Use "update" query
@ -356,12 +377,15 @@ void kv_fs_state_t::update_inode(inode_t ino, bool allow_cache, std::function<vo
{
if (!res)
{
read_hack_cache.erase(ino);
auto ientry = attrs.object_items();
change(ientry);
bool *found = new bool;
*found = true;
proxy->db->set(kv_inode_key(ino), json11::Json(ientry).dump(), [=](int res)
json11::Json ientry_json(ientry);
proxy->db->set(kv_inode_key(ino), ientry_json.dump(), [=](int res)
{
read_hack_cache.erase(ino);
if (!*found)
res = -ENOENT;
delete found;
@ -384,6 +408,8 @@ void kv_fs_state_t::update_inode(inode_t ino, bool allow_cache, std::function<vo
void kv_fs_state_t::touch_inodes()
{
// Clear RDMA read fattr3 "hack" cache every second
read_hack_cache.clear();
std::set<inode_t> q = std::move(touch_queue);
for (auto ino: q)
{

View File

@ -75,6 +75,7 @@ struct kv_fs_state_t
std::map<inode_t, kv_inode_extend_t> extends;
std::set<inode_t> touch_queue;
std::map<inode_t, uint64_t> volume_removed;
std::map<inode_t, json11::Json> read_hack_cache;
uint64_t volume_stats_ctr = 0;
uint64_t volume_touch_ctr = 0;
@ -87,6 +88,7 @@ struct kv_fs_state_t
void upgrade_db(std::function<void(int)> cb);
void defrag_all(json11::Json cfg, std::function<void(int)> cb);
void defrag_volume(inode_t ino, bool no_rm, bool dry_run, std::function<void(int, uint64_t, uint64_t, uint64_t)> cb);
void write_inode(inode_t ino, json11::Json value, bool hack_cache, std::function<void(int)> cb, std::function<bool(int, const std::string &)> cas_cb);
~kv_fs_state_t();
};
@ -116,7 +118,7 @@ nfstime3 nfstime_from_str(const std::string & s);
std::string nfstime_to_str(nfstime3 t);
std::string nfstime_now_str();
int kv_map_type(const std::string & type);
fattr3 get_kv_attributes(nfs_client_t *self, uint64_t ino, json11::Json attrs);
fattr3 get_kv_attributes(nfs_proxy_t *proxy, uint64_t ino, json11::Json attrs);
std::string kv_direntry_key(uint64_t dir_ino, const std::string & filename);
std::string kv_direntry_filename(const std::string & key);
std::string kv_inode_prefix_key(uint64_t ino, const char *prefix);

View File

@ -143,7 +143,7 @@ resume_2:
cb(st->res);
return;
}
st->self->parent->db->set(kv_inode_key(st->new_id), st->attrs.dump().c_str(), [st](int res)
st->self->parent->kvfs->write_inode(st->new_id, st->attrs, false, [st](int res)
{
st->res = res;
kv_continue_create(st, 3);
@ -267,7 +267,7 @@ template<class T, class Tok> static void kv_create_reply(kv_create_state *st, in
},
.obj_attributes = {
.attributes_follow = 1,
.attributes = get_kv_attributes(st->self, st->new_id, st->attrs),
.attributes = get_kv_attributes(st->self->parent, st->new_id, st->attrs),
},
},
};

View File

@ -68,7 +68,7 @@ int kv_nfs3_getattr_proc(void *opaque, rpc_op_t *rop)
*reply = (GETATTR3res){
.status = NFS3_OK,
.resok = (GETATTR3resok){
.obj_attributes = get_kv_attributes(self, ino, attrs),
.obj_attributes = get_kv_attributes(self->parent, ino, attrs),
},
};
}

View File

@ -102,7 +102,7 @@ resume_2:
new_ientry["ctime"] = nfstime_now_str();
st->ientry = new_ientry;
}
st->self->parent->db->set(kv_inode_key(st->ino), st->ientry.dump(), [st](int res)
st->self->parent->kvfs->write_inode(st->ino, st->ientry, false, [st](int res)
{
st->res = res;
nfs_kv_continue_link(st, 3);
@ -180,7 +180,7 @@ int kv_nfs3_link_proc(void *opaque, rpc_op_t *rop)
.resok = (LINK3resok){
.file_attributes = (post_op_attr){
.attributes_follow = 1,
.attributes = get_kv_attributes(st->self, st->ino, st->ientry),
.attributes = get_kv_attributes(st->self->parent, st->ino, st->ientry),
},
},
};

View File

@ -55,7 +55,7 @@ int kv_nfs3_lookup_proc(void *opaque, rpc_op_t *rop)
.object = xdr_copy_string(rop->xdrs, kv_fh(ino)),
.obj_attributes = {
.attributes_follow = 1,
.attributes = get_kv_attributes(self, ino, ientry),
.attributes = get_kv_attributes(self->parent, ino, ientry),
},
},
};
@ -91,10 +91,14 @@ int kv_nfs3_readlink_proc(void *opaque, rpc_op_t *rop)
}
else
{
std::string link_target = attrs["symlink"].string_value();
char *cp = (char*)self->malloc_or_rdma(rop, link_target.size()+1);
memcpy(cp, link_target.data(), link_target.size());
cp[link_target.size()] = 0;
*reply = (READLINK3res){
.status = NFS3_OK,
.resok = (READLINK3resok){
.data = xdr_copy_string(rop->xdrs, attrs["symlink"].string_value()),
.data = (xdr_string_t){ link_target.size(), cp },
},
};
}

View File

@ -37,6 +37,7 @@ static void nfs_kv_continue_read(nfs_kv_read_state *st, int state)
else if (state == 1) goto resume_1;
else if (state == 2) goto resume_2;
else if (state == 3) goto resume_3;
else if (state == 4) goto resume_4;
else
{
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_read()");
@ -96,7 +97,7 @@ resume_1:
}
read_size += sizeof(shared_file_header_t);
assert(!st->aligned_buf);
st->aligned_buf = (uint8_t*)malloc_or_die(read_size);
st->aligned_buf = (uint8_t*)st->self->malloc_or_rdma(st->rop, read_size);
st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset;
st->op->iov.push_back(st->aligned_buf, read_size);
st->op->len = align_up(read_offset+read_size) - st->op->offset;
@ -117,7 +118,7 @@ resume_1:
resume_2:
if (st->res < 0)
{
free(st->aligned_buf);
st->self->free_or_rdma(st->rop, st->aligned_buf);
st->aligned_buf = NULL;
auto cb = std::move(st->cb);
cb(st->res);
@ -131,7 +132,7 @@ resume_2:
" 0x%jx offset 0x%jx: probably a read/write conflict, retrying\n",
st->ino, st->ientry["shared_ino"].uint64_value(), st->ientry["shared_offset"].uint64_value());
st->retry++;
free(st->aligned_buf);
st->self->free_or_rdma(st->rop, st->aligned_buf);
st->aligned_buf = NULL;
st->allow_cache = false;
goto resume_0;
@ -141,10 +142,39 @@ resume_2:
return;
}
}
else if (st->self->rdma_conn)
{
// Take ientry from read_hack_cache for RDMA connections
{
auto rh_it = st->self->parent->kvfs->read_hack_cache.find(st->ino);
if (rh_it != st->self->parent->kvfs->read_hack_cache.end())
{
st->ientry = rh_it->second;
}
}
if (st->ientry.is_null())
{
kv_read_inode(st->self->parent, st->ino, [st](int res, const std::string & value, json11::Json attrs)
{
st->res = res;
st->ientry = attrs;
nfs_kv_continue_read(st, 4);
}, st->allow_cache);
return;
resume_4:
if (st->res < 0 || kv_map_type(st->ientry["type"].string_value()) != NF3REG)
{
auto cb = std::move(st->cb);
cb(st->res < 0 ? st->res : -EINVAL);
return;
}
st->self->parent->kvfs->read_hack_cache[st->ino] = st->ientry;
}
}
st->aligned_offset = align_down(st->offset);
st->aligned_size = align_up(st->offset+st->size) - st->aligned_offset;
assert(!st->aligned_buf);
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
st->aligned_buf = (uint8_t*)st->self->malloc_or_rdma(st->rop, st->aligned_size);
st->buf = st->aligned_buf + st->offset - st->aligned_offset;
st->op = new cluster_op_t;
st->op->opcode = OSD_OP_READ;
@ -163,7 +193,7 @@ resume_2:
resume_3:
if (st->res < 0)
{
free(st->aligned_buf);
st->self->free_or_rdma(st->rop, st->aligned_buf);
st->aligned_buf = NULL;
}
auto cb = std::move(st->cb);
@ -194,11 +224,21 @@ int kv_nfs3_read_proc(void *opaque, rpc_op_t *rop)
*reply = (READ3res){ .status = vitastor_nfs_map_err(res) };
if (res == 0)
{
xdr_add_malloc(st->rop->xdrs, st->aligned_buf);
reply->resok.data.data = (char*)st->buf;
reply->resok.data.size = st->size;
reply->resok.count = st->size;
reply->resok.eof = st->eof;
if (st->self->rdma_conn)
{
// FIXME Linux NFS RDMA transport has a bug - when the reply
// doesn't contain post_op_attr, the data gets offsetted by
// 84 bytes (size of attributes)...
// So we have to fill it with RDMA. :-(
reply->resok.file_attributes = (post_op_attr){
.attributes_follow = 1,
.attributes = get_kv_attributes(st->self->parent, st->ino, st->ientry),
};
}
}
rpc_queue_reply(st->rop);
delete st;

View File

@ -57,7 +57,7 @@ static void kv_getattr_next(nfs_kv_readdir_state *st)
st->entries[idx].name_attributes = (post_op_attr){
// FIXME: maybe do not read parent attributes and leave them to a GETATTR?
.attributes_follow = 1,
.attributes = get_kv_attributes(st->self, st->entries[idx].fileid, ientry),
.attributes = get_kv_attributes(st->self->parent, st->entries[idx].fileid, ientry),
};
}
st->getattr_running--;
@ -126,7 +126,7 @@ resume_1:
dot.fileid = st->dir_ino;
dot.name_attributes = (post_op_attr){
.attributes_follow = 1,
.attributes = get_kv_attributes(st->self, st->dir_ino, st->ientry),
.attributes = get_kv_attributes(st->self->parent, st->dir_ino, st->ientry),
};
dot.name_handle = (post_op_fh3){
.handle_follows = 1,
@ -169,7 +169,7 @@ resume_2:
dotdot.name_attributes = (post_op_attr){
// FIXME: maybe do not read parent attributes and leave them to a GETATTR?
.attributes_follow = 1,
.attributes = get_kv_attributes(st->self,
.attributes = get_kv_attributes(st->self->parent,
st->parent_ino ? st->parent_ino : st->dir_ino,
st->parent_ino ? st->parent_ientry : st->ientry),
};

View File

@ -197,7 +197,7 @@ resume_5:
auto copy = st->ientry.object_items();
copy["nlink"] = st->ientry["nlink"].uint64_value()-1;
copy["ctime"] = nfstime_now_str();
st->self->parent->db->set(kv_inode_key(st->ino), json11::Json(copy).dump(), [st](int res)
st->self->parent->kvfs->write_inode(st->ino, copy, false, [st](int res)
{
st->res = res;
nfs_kv_continue_delete(st, 6);

View File

@ -240,7 +240,7 @@ resume_7:
copy["nlink"] = st->new_ientry["nlink"].uint64_value()-1;
copy["ctime"] = nfstime_now_str();
copy.erase("verf");
st->self->parent->db->set(kv_inode_key(st->new_direntry["ino"].uint64_value()), json11::Json(copy).dump(), [st](int res)
st->self->parent->kvfs->write_inode(st->new_direntry["ino"].uint64_value(), copy, false, [st](int res)
{
st->res = res;
nfs_kv_continue_rename(st, 8);
@ -328,7 +328,7 @@ resume_11:
ientry_new["parent_ino"] = st->new_dir_ino;
ientry_new["ctime"] = nfstime_now_str();
ientry_new.erase("verf");
st->self->parent->db->set(kv_inode_key(st->old_direntry["ino"].uint64_value()), json11::Json(ientry_new).dump(), [st](int res)
st->self->parent->kvfs->write_inode(st->old_direntry["ino"].uint64_value(), ientry_new, false, [st](int res)
{
st->res = res;
nfs_kv_continue_rename(st, 12);

View File

@ -84,7 +84,7 @@ resume_1:
}
st->new_attrs.erase("verf");
st->new_attrs["ctime"] = nfstime_now_str();
st->self->parent->db->set(kv_inode_key(st->ino), json11::Json(st->new_attrs).dump(), [st](int res)
st->self->parent->kvfs->write_inode(st->ino, st->new_attrs, false, [st](int res)
{
st->res = res;
nfs_kv_continue_setattr(st, 2);
@ -190,7 +190,7 @@ int kv_nfs3_setattr_proc(void *opaque, rpc_op_t *rop)
.obj_wcc = (wcc_data){
.after = (post_op_attr){
.attributes_follow = 1,
.attributes = get_kv_attributes(st->self, st->ino, st->new_attrs),
.attributes = get_kv_attributes(st->self->parent, st->ino, st->new_attrs),
},
},
},

View File

@ -553,7 +553,7 @@ static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t of
}
}
static std::string new_normal_ientry(nfs_kv_write_state *st)
static json11::Json new_normal_ientry(nfs_kv_write_state *st)
{
auto ni = st->ientry.object_items();
ni.erase("empty");
@ -564,10 +564,10 @@ static std::string new_normal_ientry(nfs_kv_write_state *st)
ni["size"] = st->ext->cur_extend;
ni["ctime"] = ni["mtime"] = nfstime_now_str();
ni.erase("verf");
return json11::Json(ni).dump();
return ni;
}
static std::string new_moved_ientry(nfs_kv_write_state *st)
static json11::Json new_moved_ientry(nfs_kv_write_state *st)
{
auto ni = st->ientry.object_items();
ni.erase("empty");
@ -578,10 +578,10 @@ static std::string new_moved_ientry(nfs_kv_write_state *st)
ni["size"] = st->new_size;
ni["ctime"] = ni["mtime"] = nfstime_now_str();
ni.erase("verf");
return json11::Json(ni).dump();
return ni;
}
static std::string new_shared_ientry(nfs_kv_write_state *st)
static json11::Json new_shared_ientry(nfs_kv_write_state *st)
{
auto ni = st->ientry.object_items();
ni.erase("empty");
@ -589,10 +589,10 @@ static std::string new_shared_ientry(nfs_kv_write_state *st)
ni["ctime"] = ni["mtime"] = nfstime_now_str();
ni["shared_ver"] = ni["shared_ver"].uint64_value()+1;
ni.erase("verf");
return json11::Json(ni).dump();
return ni;
}
static std::string new_unshared_ientry(nfs_kv_write_state *st)
static json11::Json new_unshared_ientry(nfs_kv_write_state *st)
{
auto ni = st->ientry.object_items();
ni.erase("empty");
@ -602,7 +602,7 @@ static std::string new_unshared_ientry(nfs_kv_write_state *st)
ni.erase("shared_ver");
ni["ctime"] = ni["mtime"] = nfstime_now_str();
ni.erase("verf");
return json11::Json(ni).dump();
return ni;
}
static void nfs_kv_extend_inode(nfs_kv_write_state *st, int state, int base_state)
@ -612,7 +612,7 @@ static void nfs_kv_extend_inode(nfs_kv_write_state *st, int state, int base_stat
st->ext->cur_extend = st->ext->next_extend;
st->ext->next_extend = 0;
st->res2 = -EAGAIN;
st->proxy->db->set(kv_inode_key(st->ino), new_normal_ientry(st), [st, base_state](int res)
st->proxy->kvfs->write_inode(st->ino, new_normal_ientry(st), true, [st, base_state](int res)
{
st->res = res;
nfs_kv_continue_write(st, base_state+1);
@ -838,7 +838,7 @@ resume_4:
cb(st->res);
return;
}
st->proxy->db->set(kv_inode_key(st->ino), new_moved_ientry(st), [st](int res)
st->proxy->kvfs->write_inode(st->ino, new_moved_ientry(st), true, [st](int res)
{
st->res = res;
nfs_kv_continue_write(st, 5);
@ -881,7 +881,7 @@ resume_7:
}
resume_8:
// We always have to change inode entry on shared writes
st->proxy->db->set(kv_inode_key(st->ino), new_shared_ientry(st), [st](int res)
st->proxy->kvfs->write_inode(st->ino, new_shared_ientry(st), true, [st](int res)
{
st->res = res;
nfs_kv_continue_write(st, 9);
@ -930,7 +930,7 @@ resume_11:
return;
}
}
st->proxy->db->set(kv_inode_key(st->ino), new_unshared_ientry(st), [st](int res)
st->proxy->kvfs->write_inode(st->ino, new_unshared_ientry(st), true, [st](int res)
{
st->res = res;
nfs_kv_continue_write(st, 12);
@ -953,7 +953,7 @@ resume_12:
}
// Record removed part of the shared inode as obsolete in statistics
st->proxy->kvfs->volume_removed[st->ientry["shared_ino"].uint64_value()] += st->ientry["shared_alloc"].uint64_value();
st->ientry_text = new_unshared_ientry(st);
st->ientry_text = new_unshared_ientry(st).dump();
}
// Non-shared write
nfs_do_align_write(st, st->ino, st->offset, 0, 13);

View File

@ -34,6 +34,9 @@ const char *exe_name = NULL;
nfs_proxy_t::~nfs_proxy_t()
{
#ifdef WITH_RDMACM
destroy_rdma();
#endif
if (kvfs)
delete kvfs;
if (blockfs)
@ -65,9 +68,16 @@ static const char* help_text =
"\n"
"vitastor-nfs (--fs <NAME> | --block) start\n"
" Start network NFS server. Options:\n"
" --bind <IP> bind service to <IP> address (default 0.0.0.0)\n"
" --port <PORT> use port <PORT> for NFS services (default is 2049)\n"
" --portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)\n"
" --bind <IP> bind service to <IP> address (default 0.0.0.0)\n"
" --port <PORT> use port <PORT> for NFS services (default is 2049)\n"
" specify \"auto\" to auto-select and print port\n"
" --portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)\n"
" --nfs_rdma <PORT> enable NFS-RDMA at RDMA-CM port <PORT> (you can try 20049)\n"
" if RDMA is enabled and --port is set to 0, TCP will be disabled\n"
" --nfs_rdma_credit 16 maximum operation credit for RDMA clients (max iodepth)\n"
" --nfs_rdma_send 1024 maximum RDMA send operation count (should be larger than iodepth)\n"
" --nfs_rdma_alloc 1M RDMA memory allocation rounding\n"
" --nfs_rdma_gc 64M maximum unused RDMA buffers\n"
"\n"
"vitastor-nfs --fs <NAME> upgrade\n"
" Upgrade FS metadata. Can be run online, but server(s) should be restarted\n"
@ -184,6 +194,7 @@ void nfs_proxy_t::run(json11::Json cfg)
srand48(tv.tv_sec*1000000000 + tv.tv_nsec);
server_id = (uint64_t)lrand48() | ((uint64_t)lrand48() << 31) | ((uint64_t)lrand48() << 62);
// Parse options
mountpoint = cfg["mount"].string_value();
if (cfg["logfile"].string_value() != "")
logfile = cfg["logfile"].string_value();
pidfile = cfg["pidfile"].string_value();
@ -194,8 +205,24 @@ void nfs_proxy_t::run(json11::Json cfg)
default_pool = cfg["pool"].as_string();
portmap_enabled = !json_is_false(cfg["portmap"]);
nfs_port = cfg["port"].uint64_value() & 0xffff;
nfs_rdma_port = cfg["nfs_rdma"].uint64_value() & 0xffff;
// Allow RDMA-only mode if port is explicitly set to 0
// Allow port auto-selection in server mode if explicitly set to --port auto
nfs_port_auto = cfg["port"] == "auto";
if (!nfs_port)
nfs_port = 2049;
nfs_port = nfs_port_auto ? 0 : (!cfg["port"].is_null() && nfs_rdma_port ? -1 : 2049);
nfs_rdma_credit = cfg["nfs_rdma_credit"].uint64_value();
if (!nfs_rdma_credit)
nfs_rdma_credit = 16;
nfs_rdma_max_send = cfg["nfs_rdma_send"].uint64_value();
if (!nfs_rdma_max_send)
nfs_rdma_max_send = 1024;
nfs_rdma_alloc = cfg["nfs_rdma_alloc"].uint64_value();
if (!nfs_rdma_alloc)
nfs_rdma_alloc = 1048576;
nfs_rdma_gc = cfg["nfs_rdma_gc"].uint64_value();
if (!nfs_rdma_gc)
nfs_rdma_gc = 64*1048576;
export_root = cfg["nfspath"].string_value();
if (!export_root.size())
export_root = "/";
@ -207,7 +234,6 @@ void nfs_proxy_t::run(json11::Json cfg)
obj["client_writeback_allowed"] = true;
cfg = obj;
}
mountpoint = cfg["mount"].string_value();
if (mountpoint != "")
{
bind_address = "127.0.0.1";
@ -292,49 +318,60 @@ void nfs_proxy_t::run(json11::Json cfg)
void nfs_proxy_t::run_server(json11::Json cfg)
{
if (nfs_port != -1)
{
// Create NFS socket and add it to epoll
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
{
if (epoll_events & EPOLLRDHUP)
{
fprintf(stderr, "Listening portmap socket disconnected, exiting\n");
exit(1);
}
else
{
do_accept(nfs_socket);
}
});
if (nfs_port_auto)
{
printf("Port: %d\n", listening_port);
}
}
else
{
listening_port = nfs_rdma_port;
}
// Self-register portmap and NFS
pmap.reg_ports.insert((portmap_id_t){
.prog = PMAP_PROGRAM,
.vers = PMAP_V2,
.port = portmap_enabled ? 111 : nfs_port,
.port = (unsigned)(portmap_enabled ? 111 : listening_port),
.owner = "portmapper-service",
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(nfs_port)),
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(listening_port)),
});
pmap.reg_ports.insert((portmap_id_t){
.prog = PMAP_PROGRAM,
.vers = PMAP_V3,
.port = portmap_enabled ? 111 : nfs_port,
.port = (unsigned)(portmap_enabled ? 111 : listening_port),
.owner = "portmapper-service",
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(nfs_port)),
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(listening_port)),
});
pmap.reg_ports.insert((portmap_id_t){
.prog = NFS_PROGRAM,
.vers = NFS_V3,
.port = nfs_port,
.port = (unsigned)listening_port,
.owner = "nfs-server",
.addr = "0.0.0.0.0."+std::to_string(nfs_port),
.addr = "0.0.0.0.0."+std::to_string(listening_port),
});
pmap.reg_ports.insert((portmap_id_t){
.prog = MOUNT_PROGRAM,
.vers = MOUNT_V3,
.port = nfs_port,
.port = (unsigned)listening_port,
.owner = "rpc.mountd",
.addr = "0.0.0.0.0."+std::to_string(nfs_port),
});
// Create NFS socket and add it to epoll
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
{
if (epoll_events & EPOLLRDHUP)
{
fprintf(stderr, "Listening portmap socket disconnected, exiting\n");
exit(1);
}
else
{
do_accept(nfs_socket);
}
.addr = "0.0.0.0.0."+std::to_string(listening_port),
});
if (portmap_enabled)
{
@ -354,6 +391,10 @@ void nfs_proxy_t::run_server(json11::Json cfg)
}
});
}
if (nfs_rdma_port)
{
rdma_context = create_rdma(bind_address, nfs_rdma_port, nfs_rdma_credit, nfs_rdma_max_send, nfs_rdma_alloc, nfs_rdma_gc);
}
if (mountpoint != "")
{
mount_fs();
@ -499,6 +540,20 @@ void nfs_proxy_t::check_default_pool()
}
}
nfs_client_t *nfs_proxy_t::create_client()
{
auto cli = new nfs_client_t();
cli->parent = this;
if (kvfs)
nfs_kv_procs(cli);
else
nfs_block_procs(cli);
for (auto & fn: pmap.proc_table)
cli->proc_table.insert(fn);
rpc_clients.insert(cli);
return cli;
}
void nfs_proxy_t::do_accept(int listen_fd)
{
struct sockaddr_storage addr;
@ -512,18 +567,8 @@ void nfs_proxy_t::do_accept(int listen_fd)
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
auto cli = new nfs_client_t();
if (kvfs)
nfs_kv_procs(cli);
else
nfs_block_procs(cli);
cli->parent = this;
auto cli = this->create_client();
cli->nfs_fd = nfs_fd;
for (auto & fn: pmap.proc_table)
{
cli->proc_table.insert(fn);
}
rpc_clients[nfs_fd] = cli;
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
{
// Handle incoming event
@ -780,11 +825,17 @@ void nfs_client_t::stop()
stopped = true;
if (refs <= 0)
{
#ifdef WITH_RDMACM
destroy_rdma_conn();
#endif
auto parent = this->parent;
parent->rpc_clients.erase(nfs_fd);
parent->rpc_clients.erase(this);
parent->active_connections--;
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
close(nfs_fd);
if (nfs_fd >= 0)
{
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
close(nfs_fd);
}
delete this;
parent->check_exit();
}
@ -813,8 +864,7 @@ void nfs_client_t::handle_send(int result)
if (rop)
{
// Reply fully sent
xdr_reset(rop->xdrs);
parent->xdr_pool.push_back(rop->xdrs);
parent->free_xdr(rop->xdrs);
if (rop->buffer && rop->referenced)
{
// Dereference the buffer
@ -831,7 +881,7 @@ void nfs_client_t::handle_send(int result)
{
// FIXME Maybe put free_buffers into parent
free_buffers.push_back((rpc_free_buffer_t){
.buf = rop->buffer,
.buf = (uint8_t*)rop->buffer,
.size = ub.size,
});
used_buffers.erase(rop->buffer);
@ -876,8 +926,6 @@ void nfs_client_t::handle_send(int result)
void rpc_queue_reply(rpc_op_t *rop)
{
nfs_client_t *self = (nfs_client_t*)rop->client;
iovec *iov_list = NULL;
unsigned iov_count = 0;
int r = xdr_encode(rop->xdrs, (xdrproc_t)xdr_rpc_msg, &rop->out_msg);
assert(r);
if (rop->reply_fn != NULL)
@ -885,55 +933,78 @@ void rpc_queue_reply(rpc_op_t *rop)
r = xdr_encode(rop->xdrs, rop->reply_fn, rop->reply);
assert(r);
}
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
assert(iov_count > 0);
rop->reply_marker = 0;
for (unsigned i = 0; i < iov_count; i++)
#ifdef WITH_RDMACM
if (!self->rdma_conn)
#endif
{
rop->reply_marker += iov_list[i].iov_len;
}
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
to_outbox.push_back(NULL);
for (unsigned i = 0; i < iov_count; i++)
{
to_send_list.push_back(iov_list[i]);
iovec *iov_list = NULL;
unsigned iov_count = 0;
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
assert(iov_count > 0);
rop->reply_marker = 0;
for (unsigned i = 0; i < iov_count; i++)
{
rop->reply_marker += iov_list[i].iov_len;
}
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
to_outbox.push_back(NULL);
for (unsigned i = 0; i < iov_count; i++)
{
to_send_list.push_back(iov_list[i]);
to_outbox.push_back(NULL);
}
to_outbox[to_outbox.size()-1] = rop;
self->submit_send();
}
to_outbox[to_outbox.size()-1] = rop;
self->submit_send();
#ifdef WITH_RDMACM
else
{
self->rdma_queue_reply(rop);
}
#endif
}
int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len)
XDR *nfs_proxy_t::get_xdr()
{
// Take an XDR object from the pool
XDR *xdrs;
if (parent->xdr_pool.size())
if (xdr_pool.size())
{
xdrs = parent->xdr_pool.back();
parent->xdr_pool.pop_back();
xdrs = xdr_pool.back();
xdr_pool.pop_back();
}
else
{
xdrs = xdr_create();
}
return xdrs;
}
void nfs_proxy_t::free_xdr(XDR *xdrs)
{
xdr_reset(xdrs);
xdr_pool.push_back(xdrs);
}
int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len)
{
XDR *xdrs = parent->get_xdr();
// Decode the RPC header
char inmsg_data[sizeof(rpc_msg)];
rpc_msg *inmsg = (rpc_msg*)&inmsg_data;
if (!xdr_decode(xdrs, msg_buf, msg_len, (xdrproc_t)xdr_rpc_msg, inmsg))
{
// Invalid message, ignore it
xdr_reset(xdrs);
parent->xdr_pool.push_back(xdrs);
parent->free_xdr(xdrs);
return 0;
}
if (inmsg->body.dir != RPC_CALL)
{
// Reply sent to the server? Strange thing. Also ignore it
xdr_reset(xdrs);
parent->xdr_pool.push_back(xdrs);
parent->free_xdr(xdrs);
return 0;
}
if (inmsg->body.cbody.rpcvers != RPC_MSG_VERSION)
@ -968,6 +1039,17 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
// Incoming buffer isn't needed to handle request, so return 0
return 0;
}
auto rop = create_rpc_op(xdrs, base_buf, inmsg, NULL);
if (!rop)
{
// No such procedure
return 0;
}
return handle_rpc_op(rop);
}
rpc_op_t *nfs_client_t::create_rpc_op(XDR *xdrs, void *buffer, rpc_msg *inmsg, rdma_msg *rmsg)
{
// Find decoder for the request
auto proc_it = proc_table.find((rpc_service_proc_t){
.prog = inmsg->body.cbody.prog,
@ -995,6 +1077,7 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(sizeof(rpc_op_t));
*rop = (rpc_op_t){
.client = this,
.buffer = buffer,
.xdrs = xdrs,
.out_msg = (rpc_msg){
.xid = inmsg->xid,
@ -1017,9 +1100,15 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
},
},
};
// FIXME: malloc and avoid copy?
memcpy(&rop->in_msg, inmsg, sizeof(rpc_msg));
if (rmsg)
{
memcpy(&rop->in_rdma_msg, rmsg, sizeof(rdma_msg));
}
rpc_queue_reply(rop);
// Incoming buffer isn't needed to handle request, so return 0
return 0;
return NULL;
}
// Allocate memory
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(
@ -1028,7 +1117,7 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
rpc_reply_stat x = RPC_MSG_ACCEPTED;
*rop = (rpc_op_t){
.client = this,
.buffer = (uint8_t*)base_buf,
.buffer = buffer,
.xdrs = xdrs,
.out_msg = (rpc_msg){
.xid = inmsg->xid,
@ -1045,10 +1134,25 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
.request = ((uint8_t*)rop) + sizeof(rpc_op_t),
.reply = ((uint8_t*)rop) + sizeof(rpc_op_t) + proc_it->req_size,
};
// FIXME: malloc and avoid copy?
memcpy(&rop->in_msg, inmsg, sizeof(rpc_msg));
if (rmsg)
{
memcpy(&rop->in_rdma_msg, rmsg, sizeof(rdma_msg));
}
return rop;
}
int nfs_client_t::handle_rpc_op(rpc_op_t *rop)
{
// Try to decode the request
// req_fn may be NULL, that means function has no arguments
if (proc_it->req_fn && !proc_it->req_fn(xdrs, rop->request))
auto proc_it = proc_table.find((rpc_service_proc_t){
.prog = rop->in_msg.body.cbody.prog,
.vers = rop->in_msg.body.cbody.vers,
.proc = rop->in_msg.body.cbody.proc,
});
if (proc_it == proc_table.end() || proc_it->req_fn && !proc_it->req_fn(rop->xdrs, rop->request))
{
// Invalid request
rop->out_msg.body.rbody.areply.reply_data.stat = RPC_GARBAGE_ARGS;
@ -1058,18 +1162,55 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
}
rop->out_msg.body.rbody.areply.reply_data.stat = RPC_SUCCESS;
rop->reply_fn = proc_it->resp_fn;
rop->referenced = 0;
int ref = proc_it->handler_fn(proc_it->opaque, rop);
rop->referenced = ref ? 1 : 0;
if (ref)
rop->referenced = 1;
return ref;
}
void *nfs_client_t::malloc_or_rdma(rpc_op_t *rop, size_t size)
{
#ifdef WITH_RDMACM
if (!rdma_conn)
{
#endif
void *buf = malloc_or_die(size);
xdr_add_malloc(rop->xdrs, buf);
return buf;
#ifdef WITH_RDMACM
}
void *buf = rdma_malloc(size);
xdr_set_rdma_chunk(rop->xdrs, buf);
return buf;
#endif
}
void nfs_client_t::free_or_rdma(rpc_op_t *rop, void *buf)
{
#ifdef WITH_RDMACM
if (!rdma_conn)
{
#endif
xdr_del_malloc(rop->xdrs, buf);
free(buf);
#ifdef WITH_RDMACM
}
else
{
xdr_set_rdma_chunk(rop->xdrs, NULL);
rdma_free(buf);
}
#endif
}
void nfs_proxy_t::daemonize()
{
// Stop all clients because client I/O sometimes breaks during daemonize
// I.e. the new process stops receiving events on the old FD
// It doesn't happen if we call sleep(1) here, but we don't want to call sleep(1)...
for (auto & clp: rpc_clients)
clp.second->stop();
for (auto & cli: rpc_clients)
cli->stop();
if (fork())
exit(0);
setsid();

View File

@ -22,6 +22,7 @@ class cli_tool_t;
struct kv_fs_state_t;
struct block_fs_state_t;
class nfs_client_t;
struct nfs_rdma_context_t;
class nfs_proxy_t
{
@ -33,7 +34,13 @@ public:
std::string default_pool;
std::string export_root;
bool portmap_enabled;
unsigned nfs_port;
bool nfs_port_auto = false;
unsigned nfs_port = 0;
unsigned nfs_rdma_port = 0;
uint32_t nfs_rdma_credit = 16;
uint32_t nfs_rdma_max_send = 1024;
uint64_t nfs_rdma_alloc = 1048576;
uint64_t nfs_rdma_gc = 500*1048576;
int trace = 0;
std::string logfile = "/dev/null";
std::string pidfile;
@ -55,7 +62,8 @@ public:
vitastorkv_dbw_t *db = NULL;
kv_fs_state_t *kvfs = NULL;
block_fs_state_t *blockfs = NULL;
std::map<int, nfs_client_t*> rpc_clients;
nfs_rdma_context_t* rdma_context = NULL;
std::set<nfs_client_t*> rpc_clients;
std::vector<XDR*> xdr_pool;
@ -72,12 +80,20 @@ public:
void watch_stats();
void parse_stats(etcd_kv_t & kv);
void check_default_pool();
nfs_client_t* create_client();
void do_accept(int listen_fd);
void daemonize();
void write_pid();
void mount_fs();
void check_already_mounted();
void check_exit();
nfs_rdma_context_t* create_rdma(const std::string & bind_address, int rdmacm_port,
uint32_t max_iodepth, uint32_t max_send_wr, uint64_t rdma_malloc_round_to, uint64_t rdma_max_unused_buffers);
void destroy_rdma();
XDR *get_xdr();
void free_xdr(XDR *xdrs);
};
struct rpc_cur_buffer_t
@ -101,19 +117,24 @@ struct rpc_free_buffer_t
unsigned size;
};
struct nfs_rdma_conn_t;
class nfs_client_t
{
public:
nfs_proxy_t *parent = NULL;
int nfs_fd;
int epoll_events = 0;
int refs = 0;
bool stopped = false;
std::set<rpc_service_proc_t> proc_table;
nfs_rdma_conn_t *rdma_conn = NULL;
// <TCP>
int nfs_fd = -1;
int epoll_events = 0;
// Read state
rpc_cur_buffer_t cur_buffer = { 0 };
std::map<uint8_t*, rpc_used_buffer_t> used_buffers;
std::map<void*, rpc_used_buffer_t> used_buffers;
std::vector<rpc_free_buffer_t> free_buffers;
iovec read_iov;
@ -130,7 +151,16 @@ public:
void submit_send();
void handle_send(int result);
int handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len);
// </TCP>
rpc_op_t *create_rpc_op(XDR *xdrs, void *buffer, rpc_msg *inmsg, rdma_msg *rmsg);
int handle_rpc_op(rpc_op_t *rop);
bool deref();
void stop();
void *malloc_or_rdma(rpc_op_t *rop, size_t size);
void free_or_rdma(rpc_op_t *rop, void *buf);
void *rdma_malloc(size_t size);
void rdma_free(void *buf);
void rdma_queue_reply(rpc_op_t *rop);
void destroy_rdma_conn();
};

1076
src/nfs/nfs_proxy_rdma.cpp Normal file

File diff suppressed because it is too large Load Diff

View File

@ -168,7 +168,7 @@ struct WRITE3args {
offset3 offset;
count3 count;
stable_how stable;
opaque data<>;
opaque data<>; /* RDMA DDP-eligible */
};
typedef opaque writeverf3[NFS3_WRITEVERFSIZE];
@ -409,7 +409,7 @@ struct READ3resok {
post_op_attr file_attributes;
count3 count;
bool eof;
opaque data<>;
opaque data<>; /* RDMA DDP-eligible */
};
struct READ3resfail {
@ -514,7 +514,7 @@ typedef string nfspath3<>;
struct symlinkdata3 {
sattr3 symlink_attributes;
nfspath3 symlink_data;
nfspath3 symlink_data; /* RDMA DDP-eligible */
};
struct SYMLINK3args {
@ -546,7 +546,7 @@ struct READLINK3args {
struct READLINK3resok {
post_op_attr symlink_attributes;
nfspath3 data;
nfspath3 data; /* RDMA DDP-eligible */
};
struct READLINK3resfail {

View File

@ -272,7 +272,7 @@ xdr_WRITE3args (XDR *xdrs, WRITE3args *objp)
return FALSE;
if (!xdr_stable_how (xdrs, &objp->stable))
return FALSE;
if (!xdr_bytes(xdrs, &objp->data, ~0))
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
return FALSE;
return TRUE;
}
@ -829,7 +829,7 @@ xdr_READ3resok (XDR *xdrs, READ3resok *objp)
return FALSE;
if (!xdr_bool (xdrs, &objp->eof))
return FALSE;
if (!xdr_bytes(xdrs, &objp->data, ~0))
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
return FALSE;
return TRUE;
}
@ -1173,10 +1173,10 @@ xdr_PATHCONF3res (XDR *xdrs, PATHCONF3res *objp)
}
bool_t
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp)
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp, bool rdma_chunk)
{
if (!xdr_string (xdrs, objp, ~0))
if (!xdr_string (xdrs, objp, ~0, rdma_chunk))
return FALSE;
return TRUE;
}
@ -1187,7 +1187,7 @@ xdr_symlinkdata3 (XDR *xdrs, symlinkdata3 *objp)
if (!xdr_sattr3 (xdrs, &objp->symlink_attributes))
return FALSE;
if (!xdr_nfspath3 (xdrs, &objp->symlink_data))
if (!xdr_nfspath3 (xdrs, &objp->symlink_data, true))
return FALSE;
return TRUE;
}
@ -1259,7 +1259,7 @@ xdr_READLINK3resok (XDR *xdrs, READLINK3resok *objp)
if (!xdr_post_op_attr (xdrs, &objp->symlink_attributes))
return FALSE;
if (!xdr_nfspath3 (xdrs, &objp->data))
if (!xdr_nfspath3 (xdrs, &objp->data, true))
return FALSE;
return TRUE;
}

View File

@ -0,0 +1,53 @@
diff --git a/src/nfs/proto/nfs_xdr.cpp b/src/nfs/proto/nfs_xdr.cpp
index 87451293..5897e6ad 100644
--- a/src/nfs/proto/nfs_xdr.cpp
+++ b/src/nfs/proto/nfs_xdr.cpp
@@ -272,7 +272,7 @@ xdr_WRITE3args (XDR *xdrs, WRITE3args *objp)
return FALSE;
if (!xdr_stable_how (xdrs, &objp->stable))
return FALSE;
- if (!xdr_bytes(xdrs, &objp->data, ~0))
+ if (!xdr_bytes(xdrs, &objp->data, ~0, true))
return FALSE;
return TRUE;
}
@@ -829,7 +829,7 @@ xdr_READ3resok (XDR *xdrs, READ3resok *objp)
return FALSE;
if (!xdr_bool (xdrs, &objp->eof))
return FALSE;
- if (!xdr_bytes(xdrs, &objp->data, ~0))
+ if (!xdr_bytes(xdrs, &objp->data, ~0, true))
return FALSE;
return TRUE;
}
@@ -1173,10 +1173,10 @@ xdr_PATHCONF3res (XDR *xdrs, PATHCONF3res *objp)
}
bool_t
-xdr_nfspath3 (XDR *xdrs, nfspath3 *objp)
+xdr_nfspath3 (XDR *xdrs, nfspath3 *objp, bool rdma_chunk)
{
- if (!xdr_string (xdrs, objp, ~0))
+ if (!xdr_string (xdrs, objp, ~0, rdma_chunk))
return FALSE;
return TRUE;
}
@@ -1187,7 +1187,7 @@ xdr_symlinkdata3 (XDR *xdrs, symlinkdata3 *objp)
if (!xdr_sattr3 (xdrs, &objp->symlink_attributes))
return FALSE;
- if (!xdr_nfspath3 (xdrs, &objp->symlink_data))
+ if (!xdr_nfspath3 (xdrs, &objp->symlink_data, true))
return FALSE;
return TRUE;
}
@@ -1259,7 +1259,7 @@ xdr_READLINK3resok (XDR *xdrs, READLINK3resok *objp)
if (!xdr_post_op_attr (xdrs, &objp->symlink_attributes))
return FALSE;
- if (!xdr_nfspath3 (xdrs, &objp->data))
+ if (!xdr_nfspath3 (xdrs, &objp->data, true))
return FALSE;
return TRUE;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include "rpc.h"
#include "rpc_rdma.h"
struct rpc_op_t;
@ -27,12 +28,16 @@ inline bool operator < (const rpc_service_proc_t & a, const rpc_service_proc_t &
return a.prog < b.prog || a.prog == b.prog && (a.vers < b.vers || a.vers == b.vers && a.proc < b.proc);
}
struct rdma_msg;
struct rpc_op_t
{
void *client;
uint8_t *buffer;
void *buffer;
XDR *xdrs;
rpc_msg in_msg, out_msg;
rdma_msg in_rdma_msg;
rpc_rdma_errcode rdma_error;
void *request;
void *reply;
xdrproc_t reply_fn;

144
src/nfs/proto/rpc_rdma.h Normal file
View File

@ -0,0 +1,144 @@
/*
* Please do not edit this file.
* It was generated using rpcgen.
*/
#ifndef _RPC_RDMA_H_RPCGEN
#define _RPC_RDMA_H_RPCGEN
#include "xdr_impl.h"
#ifdef __cplusplus
extern "C" {
#endif
struct xdr_rdma_segment {
uint32_t handle;
uint32_t length;
uint64_t offset;
};
typedef struct xdr_rdma_segment xdr_rdma_segment;
struct xdr_read_chunk {
uint32_t position;
struct xdr_rdma_segment target;
};
typedef struct xdr_read_chunk xdr_read_chunk;
struct xdr_read_list {
struct xdr_read_chunk entry;
struct xdr_read_list *next;
};
typedef struct xdr_read_list xdr_read_list;
struct xdr_write_chunk {
struct {
u_int target_len;
struct xdr_rdma_segment *target_val;
} target;
};
typedef struct xdr_write_chunk xdr_write_chunk;
struct xdr_write_list {
struct xdr_write_chunk entry;
struct xdr_write_list *next;
};
typedef struct xdr_write_list xdr_write_list;
struct rpc_rdma_header {
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
};
typedef struct rpc_rdma_header rpc_rdma_header;
struct rpc_rdma_header_nomsg {
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
};
typedef struct rpc_rdma_header_nomsg rpc_rdma_header_nomsg;
struct rpc_rdma_header_padded {
uint32_t rdma_align;
uint32_t rdma_thresh;
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
};
typedef struct rpc_rdma_header_padded rpc_rdma_header_padded;
enum rpc_rdma_errcode {
ERR_VERS = 1,
ERR_CHUNK = 2,
};
typedef enum rpc_rdma_errcode rpc_rdma_errcode;
struct rpc_rdma_errvers {
uint32_t rdma_vers_low;
uint32_t rdma_vers_high;
};
typedef struct rpc_rdma_errvers rpc_rdma_errvers;
struct rpc_rdma_error {
rpc_rdma_errcode err;
union {
rpc_rdma_errvers range;
};
};
typedef struct rpc_rdma_error rpc_rdma_error;
enum rdma_proc {
RDMA_MSG = 0,
RDMA_NOMSG = 1,
RDMA_MSGP = 2,
RDMA_DONE = 3,
RDMA_ERROR = 4,
};
typedef enum rdma_proc rdma_proc;
struct rdma_body {
rdma_proc proc;
union {
rpc_rdma_header rdma_msg;
rpc_rdma_header_nomsg rdma_nomsg;
rpc_rdma_header_padded rdma_msgp;
rpc_rdma_error rdma_error;
};
};
typedef struct rdma_body rdma_body;
struct rdma_msg {
uint32_t rdma_xid;
uint32_t rdma_vers;
uint32_t rdma_credit;
rdma_body rdma_body;
};
typedef struct rdma_msg rdma_msg;
/* the xdr functions */
extern bool_t xdr_xdr_rdma_segment (XDR *, xdr_rdma_segment*);
extern bool_t xdr_xdr_read_chunk (XDR *, xdr_read_chunk*);
extern bool_t xdr_xdr_read_list (XDR *, xdr_read_list*);
extern bool_t xdr_xdr_write_chunk (XDR *, xdr_write_chunk*);
extern bool_t xdr_xdr_write_list (XDR *, xdr_write_list*);
extern bool_t xdr_rpc_rdma_header (XDR *, rpc_rdma_header*);
extern bool_t xdr_rpc_rdma_header_nomsg (XDR *, rpc_rdma_header_nomsg*);
extern bool_t xdr_rpc_rdma_header_padded (XDR *, rpc_rdma_header_padded*);
extern bool_t xdr_rpc_rdma_errcode (XDR *, rpc_rdma_errcode*);
extern bool_t xdr_rpc_rdma_errvers (XDR *, rpc_rdma_errvers*);
extern bool_t xdr_rpc_rdma_error (XDR *, rpc_rdma_error*);
extern bool_t xdr_rdma_proc (XDR *, rdma_proc*);
extern bool_t xdr_rdma_body (XDR *, rdma_body*);
extern bool_t xdr_rdma_msg (XDR *, rdma_msg*);
#ifdef __cplusplus
}
#endif
#endif /* !_RPC_RDMA_H_RPCGEN */

166
src/nfs/proto/rpc_rdma.x Normal file
View File

@ -0,0 +1,166 @@
/* RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1 */
/*
* Copyright (c) 2010-2017 IETF Trust and the persons
* identified as authors of the code. All rights reserved.
*
* The authors of the code are:
* B. Callaghan, T. Talpey, and C. Lever
*
* Redistribution and use in source and binary forms, with
* or without modification, are permitted provided that the
* following conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the
* following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* - Neither the name of Internet Society, IETF or IETF
* Trust, nor the names of specific contributors, may be
* used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS
* AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Plain RDMA segment (Section 3.4.3)
*/
struct xdr_rdma_segment {
uint32_t handle; /* Registered memory handle */
uint32_t length; /* Length of the chunk in bytes */
uint64_t offset; /* Chunk virtual address or offset */
};
/*
* RDMA read segment (Section 3.4.5)
*/
struct xdr_read_chunk {
uint32_t position; /* Position in XDR stream */
struct xdr_rdma_segment target;
};
/*
* Read list (Section 4.3.1)
*/
struct xdr_read_list {
struct xdr_read_chunk entry;
struct xdr_read_list *next;
};
/*
* Write chunk (Section 3.4.6)
*/
struct xdr_write_chunk {
struct xdr_rdma_segment target<>;
};
/*
* Write list (Section 4.3.2)
*/
struct xdr_write_list {
struct xdr_write_chunk entry;
struct xdr_write_list *next;
};
/*
* Chunk lists (Section 4.3)
*/
struct rpc_rdma_header {
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
/* rpc body follows */
};
struct rpc_rdma_header_nomsg {
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
};
/* Not to be used */
struct rpc_rdma_header_padded {
uint32_t rdma_align;
uint32_t rdma_thresh;
struct xdr_read_list *rdma_reads;
struct xdr_write_list *rdma_writes;
struct xdr_write_chunk *rdma_reply;
/* rpc body follows */
};
/*
* Error handling (Section 4.5)
*/
enum rpc_rdma_errcode {
ERR_VERS = 1, /* Value fixed for all versions */
ERR_CHUNK = 2
};
/* Structure fixed for all versions */
struct rpc_rdma_errvers {
uint32_t rdma_vers_low;
uint32_t rdma_vers_high;
};
union rpc_rdma_error switch (rpc_rdma_errcode err) {
case ERR_VERS:
rpc_rdma_errvers range;
case ERR_CHUNK:
void;
};
/*
* Procedures (Section 4.2.4)
*/
enum rdma_proc {
RDMA_MSG = 0, /* Value fixed for all versions */
RDMA_NOMSG = 1, /* Value fixed for all versions */
RDMA_MSGP = 2, /* Not to be used */
RDMA_DONE = 3, /* Not to be used */
RDMA_ERROR = 4 /* Value fixed for all versions */
};
/* The position of the proc discriminator field is
* fixed for all versions */
union rdma_body switch (rdma_proc proc) {
case RDMA_MSG:
rpc_rdma_header rdma_msg;
case RDMA_NOMSG:
rpc_rdma_header_nomsg rdma_nomsg;
case RDMA_MSGP: /* Not to be used */
rpc_rdma_header_padded rdma_msgp;
case RDMA_DONE: /* Not to be used */
void;
case RDMA_ERROR:
rpc_rdma_error rdma_error;
};
/*
* Fixed header fields (Section 4.2)
*/
struct rdma_msg {
uint32_t rdma_xid; /* Position fixed for all versions */
uint32_t rdma_vers; /* Position fixed for all versions */
uint32_t rdma_credit; /* Position fixed for all versions */
rdma_body rdma_body;
};

View File

@ -0,0 +1,200 @@
/*
* Please do not edit this file.
* It was generated using rpcgen.
*/
#include "rpc_rdma.h"
#include "xdr_impl_inline.h"
bool_t
xdr_xdr_rdma_segment (XDR *xdrs, xdr_rdma_segment *objp)
{
if (!xdr_uint32_t (xdrs, &objp->handle))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->length))
return FALSE;
if (!xdr_uint64_t (xdrs, &objp->offset))
return FALSE;
return TRUE;
}
bool_t
xdr_xdr_read_chunk (XDR *xdrs, xdr_read_chunk *objp)
{
if (!xdr_uint32_t (xdrs, &objp->position))
return FALSE;
if (!xdr_xdr_rdma_segment (xdrs, &objp->target))
return FALSE;
return TRUE;
}
bool_t
xdr_xdr_read_list (XDR *xdrs, xdr_read_list *objp)
{
if (!xdr_xdr_read_chunk (xdrs, &objp->entry))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
return FALSE;
return TRUE;
}
bool_t
xdr_xdr_write_chunk (XDR *xdrs, xdr_write_chunk *objp)
{
if (!xdr_array (xdrs, (char **)&objp->target.target_val, (u_int *) &objp->target.target_len, ~0,
sizeof (xdr_rdma_segment), (xdrproc_t) xdr_xdr_rdma_segment))
return FALSE;
return TRUE;
}
bool_t
xdr_xdr_write_list (XDR *xdrs, xdr_write_list *objp)
{
if (!xdr_xdr_write_chunk (xdrs, &objp->entry))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_header (XDR *xdrs, rpc_rdma_header *objp)
{
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_header_nomsg (XDR *xdrs, rpc_rdma_header_nomsg *objp)
{
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_header_padded (XDR *xdrs, rpc_rdma_header_padded *objp)
{
if (!xdr_uint32_t (xdrs, &objp->rdma_align))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->rdma_thresh))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
return FALSE;
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_errcode (XDR *xdrs, rpc_rdma_errcode *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_errvers (XDR *xdrs, rpc_rdma_errvers *objp)
{
if (!xdr_uint32_t (xdrs, &objp->rdma_vers_low))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->rdma_vers_high))
return FALSE;
return TRUE;
}
bool_t
xdr_rpc_rdma_error (XDR *xdrs, rpc_rdma_error *objp)
{
if (!xdr_rpc_rdma_errcode (xdrs, &objp->err))
return FALSE;
switch (objp->err) {
case ERR_VERS:
if (!xdr_rpc_rdma_errvers (xdrs, &objp->range))
return FALSE;
break;
case ERR_CHUNK:
break;
default:
return FALSE;
}
return TRUE;
}
bool_t
xdr_rdma_proc (XDR *xdrs, rdma_proc *objp)
{
if (!xdr_enum (xdrs, (enum_t *) objp))
return FALSE;
return TRUE;
}
bool_t
xdr_rdma_body (XDR *xdrs, rdma_body *objp)
{
if (!xdr_rdma_proc (xdrs, &objp->proc))
return FALSE;
switch (objp->proc) {
case RDMA_MSG:
if (!xdr_rpc_rdma_header (xdrs, &objp->rdma_msg))
return FALSE;
break;
case RDMA_NOMSG:
if (!xdr_rpc_rdma_header_nomsg (xdrs, &objp->rdma_nomsg))
return FALSE;
break;
case RDMA_MSGP:
if (!xdr_rpc_rdma_header_padded (xdrs, &objp->rdma_msgp))
return FALSE;
break;
case RDMA_DONE:
break;
case RDMA_ERROR:
if (!xdr_rpc_rdma_error (xdrs, &objp->rdma_error))
return FALSE;
break;
default:
return FALSE;
}
return TRUE;
}
bool_t
xdr_rdma_msg (XDR *xdrs, rdma_msg *objp)
{
if (!xdr_uint32_t (xdrs, &objp->rdma_xid))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->rdma_vers))
return FALSE;
if (!xdr_uint32_t (xdrs, &objp->rdma_credit))
return FALSE;
if (!xdr_rdma_body (xdrs, &objp->rdma_body))
return FALSE;
return TRUE;
}

View File

@ -46,3 +46,5 @@ run_rpcgen() {
run_rpcgen nfs
run_rpcgen rpc
run_rpcgen portmap
run_rpcgen rpc_rdma
patch nfs_xdr.cpp < nfs_xdr.cpp.diff

View File

@ -16,6 +16,22 @@ void xdr_destroy(XDR* xdrs)
delete xdrs;
}
void xdr_set_rdma(XDR *xdrs)
{
xdrs->rdma = true;
}
void xdr_set_rdma_chunk(XDR *xdrs, void *chunk)
{
assert(!xdrs->rdma_chunk || !chunk);
xdrs->rdma_chunk = chunk;
}
void* xdr_get_rdma_chunk(XDR *xdrs)
{
return xdrs->rdma_chunk;
}
void xdr_reset(XDR *xdrs)
{
for (auto buf: xdrs->allocs)
@ -23,6 +39,9 @@ void xdr_reset(XDR *xdrs)
free(buf);
}
xdrs->buf = NULL;
xdrs->rdma = false;
xdrs->rdma_chunk = NULL;
xdrs->rdma_chunk_used = false;
xdrs->avail = 0;
xdrs->allocs.resize(0);
xdrs->in_linked_list.resize(0);
@ -45,6 +64,20 @@ int xdr_encode(XDR *xdrs, xdrproc_t fn, void *data)
return fn(xdrs, data);
}
size_t xdr_encode_get_size(XDR *xdrs)
{
size_t len = 0;
for (auto & buf: xdrs->buf_list)
{
len += buf.iov_len;
}
if (xdrs->last_end < xdrs->cur_out.size())
{
len += xdrs->cur_out.size() - xdrs->last_end;
}
return len;
}
void xdr_encode_finish(XDR *xdrs, iovec **iov_list, unsigned *iov_count)
{
if (xdrs->last_end < xdrs->cur_out.size())
@ -83,6 +116,18 @@ void xdr_add_malloc(XDR *xdrs, void *buf)
xdrs->allocs.push_back(buf);
}
void xdr_del_malloc(XDR *xdrs, void *buf)
{
for (int i = 0; i < xdrs->allocs.size(); i++)
{
if (xdrs->allocs[i] == buf)
{
xdrs->allocs.erase(xdrs->allocs.begin()+i);
break;
}
}
}
xdr_string_t xdr_copy_string(XDR *xdrs, const std::string & str)
{
char *cp = (char*)malloc_or_die(str.size()+1);

View File

@ -55,6 +55,15 @@ void xdr_destroy(XDR* xdrs);
// Free resources from any previous xdr_decode/xdr_encode calls
void xdr_reset(XDR *xdrs);
// Mark XDR as used for RDMA
void xdr_set_rdma(XDR *xdrs);
// Set (single) RDMA chunk buffer for this xdr before decoding an RDMA message
void xdr_set_rdma_chunk(XDR *xdrs, void *chunk);
// Get the current RDMA chunk buffer
void* xdr_get_rdma_chunk(XDR *xdrs);
// Try to decode <size> bytes from buffer <buf> using <fn>
// Result may contain memory allocations that will be valid until the next call to xdr_{reset,destroy,decode,encode}
int xdr_decode(XDR *xdrs, void *buf, unsigned size, xdrproc_t fn, void *data);
@ -64,6 +73,9 @@ int xdr_decode(XDR *xdrs, void *buf, unsigned size, xdrproc_t fn, void *data);
// May be called multiple times to encode multiple parts of the same message
int xdr_encode(XDR *xdrs, xdrproc_t fn, void *data);
// Get current size of encoded data in <xdrs>
size_t xdr_encode_get_size(XDR *xdrs);
// Get the result of previous xdr_encodes as a list of <struct iovec>'s
// in <iov_list> (start) and <iov_count> (count).
// The resulting iov_list is valid until the next call to xdr_{reset,destroy}.
@ -74,6 +86,9 @@ void xdr_encode_finish(XDR *xdrs, iovec **iov_list, unsigned *iov_count);
// Remember an allocated buffer to free it later on xdr_reset() or xdr_destroy()
void xdr_add_malloc(XDR *xdrs, void *buf);
// Remove an allocated buffer from XDR
void xdr_del_malloc(XDR *xdrs, void *buf);
xdr_string_t xdr_copy_string(XDR *xdrs, const std::string & str);
xdr_string_t xdr_copy_string(XDR *xdrs, const char *str);

View File

@ -28,6 +28,19 @@
// RPC over TCP:
//
// BE 32bit length, then rpc_msg, then the procedure message itself
//
// RPC over RDMA:
// RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1
// RFC 8267 - Network File System (NFS) Upper-Layer Binding to RPC-over-RDMA Version 1
// RFC 8797 - Remote Direct Memory Access - Connection Manager (RDMA-CM) Private Data for RPC-over-RDMA Version 1
// message is received in an RDMA Receive operation
// message: list of read chunks, list of write chunks, optional reply write chunk, then actual RPC body if present
// read chunk: BE 32bit position, BE 32bit registered memory key, BE 32bit length, BE 64bit offset
// write chunk: BE 32bit registered memory key, BE 32bit length, BE 64bit offset
// in reality for NFS 3.0: only 1 read chunk in write3 and symlink3, only 1 write chunk in read3 and readlink3
// read chunk is read by the server using RDMA Read from the client memory after receiving RPC request
// write chunk is pushed by the server using RDMA Write to the client memory before sending RPC reply
// connection is established using RDMA-CM at default port 20049
#pragma once
@ -35,6 +48,7 @@
#include <string.h>
#include <endian.h>
#include <assert.h>
#include <vector>
#include "malloc_or_die.h"
@ -61,6 +75,9 @@ struct xdr_linked_list_t
struct XDR
{
int x_op;
bool rdma = false;
void *rdma_chunk = NULL;
bool rdma_chunk_used = false;
// For decoding:
uint8_t *buf = NULL;
@ -106,13 +123,22 @@ inline int xdr_opaque(XDR *xdrs, void *data, uint32_t len)
return 1;
}
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
{
if (xdrs->x_op == XDR_DECODE)
{
if (xdrs->avail < 4)
return 0;
uint32_t len = be32toh(*((uint32_t*)xdrs->buf));
if (rdma_chunk && xdrs->rdma && xdrs->rdma_chunk)
{
// Take (only a single) RDMA chunk from xdrs->rdma_chunk while decoding
assert(!xdrs->rdma_chunk_used);
xdrs->rdma_chunk_used = true;
data->data = (char*)xdrs->rdma_chunk;
data->size = len;
return 1;
}
uint32_t padded = len_pad4(len);
if (xdrs->avail < 4+padded)
return 0;
@ -123,7 +149,8 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
}
else
{
if (data->size < XDR_COPY_LENGTH)
// Always encode RDMA chunks as separate iovecs
if (data->size < XDR_COPY_LENGTH && (!rdma_chunk || !xdrs->rdma))
{
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old + 4+data->size);
@ -146,8 +173,9 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
.iov_len = data->size,
});
}
if (data->size & 3)
if ((data->size & 3) && (!rdma_chunk || !xdrs->rdma))
{
// No padding for RDMA chunks
int pad = 4-(data->size & 3);
unsigned old = xdrs->cur_out.size();
xdrs->cur_out.resize(old+pad);
@ -158,9 +186,9 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
return 1;
}
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
{
return xdr_bytes(xdrs, data, maxlen);
return xdr_bytes(xdrs, data, maxlen, rdma_chunk);
}
inline int xdr_u_int(XDR *xdrs, void *data)
@ -182,6 +210,11 @@ inline int xdr_u_int(XDR *xdrs, void *data)
return 1;
}
inline int xdr_uint32_t(XDR *xdrs, void *data)
{
return xdr_u_int(xdrs, data);
}
inline int xdr_enum(XDR *xdrs, void *data)
{
return xdr_u_int(xdrs, data);

216
src/nfs/rdma_alloc.cpp Normal file
View File

@ -0,0 +1,216 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Simple & stupid RDMA-enabled memory allocator (allocates buffers within ibv_mr's)
#include <stdio.h>
#include <assert.h>
#include <map>
#include <set>
#include "rdma_alloc.h"
#include "malloc_or_die.h"
struct rdma_region_t
{
void *buf = NULL;
size_t len = 0;
ibv_mr *mr = NULL;
};
struct rdma_frag_t
{
rdma_region_t *rgn = NULL;
size_t len = 0;
bool is_free = false;
};
struct rdma_free_t
{
size_t len = 0;
void *buf = NULL;
};
inline bool operator < (const rdma_free_t &a, const rdma_free_t &b)
{
return a.len < b.len || a.len == b.len && a.buf < b.buf;
}
struct rdma_allocator_t
{
size_t rdma_alloc_size = 1048576;
size_t rdma_max_unused = 500*1048576;
int rdma_access = IBV_ACCESS_LOCAL_WRITE;
ibv_pd *pd = NULL;
std::set<rdma_region_t*> regions;
std::map<void*, rdma_frag_t> frags;
std::set<rdma_free_t> freelist;
size_t freebuffers = 0;
};
rdma_allocator_t *rdma_malloc_create(ibv_pd *pd, size_t rdma_alloc_size, size_t rdma_max_unused, int rdma_access)
{
rdma_allocator_t *self = new rdma_allocator_t();
self->pd = pd;
self->rdma_alloc_size = rdma_alloc_size ? rdma_alloc_size : 1048576;
self->rdma_max_unused = rdma_max_unused ? rdma_max_unused : 500*1048576;
self->rdma_access = rdma_access;
return self;
}
static void rdma_malloc_free_unused_buffers(rdma_allocator_t *self, size_t max_unused, bool force)
{
auto free_it = self->freelist.end();
if (free_it == self->freelist.begin())
return;
free_it--;
do
{
auto frag_it = self->frags.find(free_it->buf);
assert(frag_it != self->frags.end());
if (frag_it->second.len != frag_it->second.rgn->len)
{
if (force)
{
fprintf(stderr, "BUG: Attempt to destroy RDMA allocator while buffers are not freed yet\n");
abort();
}
break;
}
self->freebuffers -= frag_it->second.rgn->len;
ibv_dereg_mr(frag_it->second.rgn->mr);
free(frag_it->second.rgn);
self->regions.erase(frag_it->second.rgn);
self->frags.erase(frag_it);
if (free_it == self->freelist.begin())
{
self->freelist.erase(free_it);
break;
}
self->freelist.erase(free_it--);
} while (self->freebuffers > max_unused);
}
void rdma_malloc_destroy(rdma_allocator_t *self)
{
rdma_malloc_free_unused_buffers(self, 0, true);
assert(!self->freebuffers);
assert(!self->regions.size());
assert(!self->frags.size());
assert(!self->freelist.size());
delete self;
}
void *rdma_malloc_alloc(rdma_allocator_t *self, size_t size)
{
auto it = self->freelist.lower_bound((rdma_free_t){ .len = size });
if (it == self->freelist.end())
{
// round size up to rdma_malloc_size (1 MB)
size_t alloc_size = ((size + self->rdma_alloc_size - 1) / self->rdma_alloc_size) * self->rdma_alloc_size;
rdma_region_t *r = (rdma_region_t*)malloc_or_die(alloc_size + sizeof(rdma_region_t));
r->buf = r+1;
r->len = alloc_size;
r->mr = ibv_reg_mr(self->pd, r->buf, r->len, self->rdma_access);
if (!r->mr)
{
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
exit(1);
}
self->regions.insert(r);
self->frags[r->buf] = (rdma_frag_t){ .rgn = r, .len = alloc_size, .is_free = true };
it = self->freelist.insert((rdma_free_t){ .len = alloc_size, .buf = r->buf }).first;
self->freebuffers += alloc_size;
}
void *ptr = it->buf;
auto & frag = self->frags.at(ptr);
self->freelist.erase(it);
assert(frag.len >= size && frag.is_free);
if (frag.len == frag.rgn->len)
{
self->freebuffers -= frag.rgn->len;
}
if (frag.len == size)
{
frag.is_free = false;
}
else
{
frag.len -= size;
ptr = (uint8_t*)ptr + frag.len;
self->freelist.insert((rdma_free_t){ .len = frag.len, .buf = frag.rgn->buf });
self->frags[ptr] = (rdma_frag_t){ .rgn = frag.rgn, .len = size, .is_free = false };
}
return ptr;
}
void rdma_malloc_free(rdma_allocator_t *self, void *buf)
{
auto frag_it = self->frags.find(buf);
if (frag_it == self->frags.end())
{
fprintf(stderr, "BUG: Attempt to double-free RDMA buffer fragment 0x%jx\n", (size_t)buf);
return;
}
auto prev_it = frag_it, next_it = frag_it;
if (frag_it != self->frags.begin())
prev_it--;
next_it++;
bool merge_back = prev_it != frag_it &&
prev_it->second.is_free &&
prev_it->second.rgn == frag_it->second.rgn &&
(uint8_t*)prev_it->first+prev_it->second.len == frag_it->first;
bool merge_next = next_it != self->frags.end() &&
next_it->second.is_free &&
next_it->second.rgn == frag_it->second.rgn &&
next_it->first == (uint8_t*)frag_it->first+frag_it->second.len;
if (merge_back && merge_next)
{
prev_it->second.len += frag_it->second.len + next_it->second.len;
self->freelist.erase((rdma_free_t){ .len = next_it->second.len, .buf = next_it->first });
self->frags.erase(next_it);
self->frags.erase(frag_it);
frag_it = prev_it;
}
else if (merge_back)
{
prev_it->second.len += frag_it->second.len;
self->frags.erase(frag_it);
frag_it = prev_it;
}
else if (merge_next)
{
frag_it->second.is_free = true;
frag_it->second.len += next_it->second.len;
self->freelist.erase((rdma_free_t){ .len = next_it->second.len, .buf = next_it->first });
self->frags.erase(next_it);
}
else
{
frag_it->second.is_free = true;
self->freelist.insert((rdma_free_t){ .len = frag_it->second.len, .buf = frag_it->first });
}
assert(frag_it->second.len <= frag_it->second.rgn->len);
if (frag_it->second.len == frag_it->second.rgn->len)
{
// The whole buffer is freed
self->freebuffers += frag_it->second.rgn->len;
if (self->freebuffers > self->rdma_max_unused)
{
rdma_malloc_free_unused_buffers(self, self->rdma_max_unused, false);
}
}
}
uint32_t rdma_malloc_get_lkey(rdma_allocator_t *self, void *buf)
{
auto frag_it = self->frags.upper_bound(buf);
if (frag_it != self->frags.begin())
{
frag_it--;
if ((uint8_t*)frag_it->first + frag_it->second.len > buf)
return frag_it->second.rgn->mr->lkey;
}
fprintf(stderr, "BUG: Attempt to use an unknown RDMA buffer fragment 0x%zx\n", (size_t)buf);
abort();
}

17
src/nfs/rdma_alloc.h Normal file
View File

@ -0,0 +1,17 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
//
// Simple & stupid RDMA-enabled memory allocator (allocates buffers within ibv_mr's)
#pragma once
#include <infiniband/verbs.h>
#include <stdint.h>
struct rdma_allocator_t;
rdma_allocator_t *rdma_malloc_create(ibv_pd *pd, size_t rdma_alloc_size, size_t rdma_max_unused, int rdma_access);
void rdma_malloc_destroy(rdma_allocator_t *self);
void *rdma_malloc_alloc(rdma_allocator_t *self, size_t size);
void rdma_malloc_free(rdma_allocator_t *self, void *buf);
uint32_t rdma_malloc_get_lkey(rdma_allocator_t *self, void *buf);