Compare commits
No commits in common. "109f51a015c65e03765f91a5d2dabfb1a8184e76" and "64db31ec107fcb827363149d56f2c0e61b52bf0b" have entirely different histories.
109f51a015
...
64db31ec10
|
@ -22,8 +22,6 @@ RUN apt-get update && \
|
|||
(echo "APT::Install-Recommends false;" > /etc/apt/apt.conf) && \
|
||||
apt-get update && \
|
||||
apt-get install -y e2fsprogs xfsprogs kmod iproute2 \
|
||||
# NFS mount dependencies
|
||||
nfs-common netbase \
|
||||
# dependencies of qemu-storage-daemon
|
||||
libnuma1 liburing2 libglib2.0-0 libfuse3-3 libaio1 libzstd1 libnettle8 \
|
||||
libgmp10 libhogweed6 libp11-kit0 libidn2-0 libunistring2 libtasn1-6 libpcre2-8-0 libffi8 && \
|
||||
|
|
|
@ -9,16 +9,8 @@ metadata:
|
|||
provisioner: csi.vitastor.io
|
||||
volumeBindingMode: Immediate
|
||||
parameters:
|
||||
# CSI driver can create block-based volumes and VitastorFS-based volumes
|
||||
# only VitastorFS-based volumes and raw block volumes (without FS) support ReadWriteMany mode
|
||||
# set this parameter to VitastorFS metadata volume name to use VitastorFS
|
||||
# if unset, block-based volumes will be created
|
||||
vitastorfs: ""
|
||||
# for block-based storage classes, pool ID may be either a string (name) or a number (ID)
|
||||
# for vitastorFS-based storage classes it must be a string - name of the default pool for FS data
|
||||
poolId: "testpool"
|
||||
# volume name prefix for block-based storage classes or NFS subdirectory (including /) for FS-based volumes
|
||||
volumePrefix: ""
|
||||
etcdVolumePrefix: ""
|
||||
poolId: "1"
|
||||
# you can choose other configuration file if you have it in the config map
|
||||
# different etcd URLs and prefixes should also be put in the config
|
||||
#configPath: "/etc/vitastor/vitastor.conf"
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"strconv"
|
||||
"time"
|
||||
"os"
|
||||
"io/ioutil"
|
||||
|
@ -67,10 +68,9 @@ func GetConnectionParams(params map[string]string) (map[string]string, error)
|
|||
{
|
||||
configPath = "/etc/vitastor/vitastor.conf"
|
||||
}
|
||||
ctxVars["configPath"] = configPath
|
||||
if (params["vitastorfs"] != "")
|
||||
else
|
||||
{
|
||||
ctxVars["vitastorfs"] = params["vitastorfs"]
|
||||
ctxVars["configPath"] = configPath
|
||||
}
|
||||
config := make(map[string]interface{})
|
||||
configFD, err := os.Open(configPath)
|
||||
|
@ -140,57 +140,33 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
|||
return nil, status.Error(codes.InvalidArgument, "volume capabilities is a required field")
|
||||
}
|
||||
|
||||
ctxVars, err := GetConnectionParams(req.Parameters)
|
||||
err := cs.checkCaps(volumeCapabilities)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = cs.checkCaps(volumeCapabilities, ctxVars["vitastorfs"] != "")
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pool := req.Parameters["poolId"]
|
||||
if (pool == "")
|
||||
etcdVolumePrefix := req.Parameters["etcdVolumePrefix"]
|
||||
poolId, _ := strconv.ParseUint(req.Parameters["poolId"], 10, 64)
|
||||
if (poolId == 0)
|
||||
{
|
||||
return nil, status.Error(codes.InvalidArgument, "poolId is missing in storage class configuration")
|
||||
}
|
||||
volumePrefix := req.Parameters["volumePrefix"]
|
||||
if (volumePrefix == "")
|
||||
{
|
||||
// Old name
|
||||
volumePrefix = req.Parameters["etcdVolumePrefix"]
|
||||
}
|
||||
volName := volumePrefix + req.GetName()
|
||||
|
||||
volName := etcdVolumePrefix + req.GetName()
|
||||
volSize := 1 * GB
|
||||
if capRange := req.GetCapacityRange(); capRange != nil
|
||||
{
|
||||
volSize = ((capRange.GetRequiredBytes() + MB - 1) / MB) * MB
|
||||
}
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
ctxVars, err := GetConnectionParams(req.Parameters)
|
||||
if (err != nil)
|
||||
{
|
||||
// Nothing to create, subdirectories are created during mounting
|
||||
// FIXME: It would be cool to support quotas some day and set it here
|
||||
if (req.VolumeContentSource.GetSnapshot() != nil)
|
||||
{
|
||||
return nil, status.Error(codes.InvalidArgument, "VitastorFS doesn't support snapshots")
|
||||
}
|
||||
ctxVars["name"] = volName
|
||||
ctxVars["pool"] = pool
|
||||
volumeIdJson, _ := json.Marshal(ctxVars)
|
||||
return &csi.CreateVolumeResponse{
|
||||
Volume: &csi.Volume{
|
||||
// Ugly, but VolumeContext isn't passed to DeleteVolume :-(
|
||||
VolumeId: string(volumeIdJson),
|
||||
CapacityBytes: volSize,
|
||||
},
|
||||
}, nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
args := []string{ "create", volName, "-s", fmt.Sprintf("%v", volSize), "--pool", pool }
|
||||
args := []string{ "create", volName, "-s", fmt.Sprintf("%v", volSize), "--pool", fmt.Sprintf("%v", poolId) }
|
||||
|
||||
// Support creation from snapshot
|
||||
var src *csi.VolumeContentSource
|
||||
|
@ -273,12 +249,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
// FIXME: Delete FS subdirectory
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
_, err = invokeCLI(ctxVars, []string{ "rm", volName })
|
||||
if (err != nil)
|
||||
{
|
||||
|
@ -313,25 +283,13 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
|
|||
{
|
||||
return nil, status.Error(codes.InvalidArgument, "volumeId is nil")
|
||||
}
|
||||
volVars := make(map[string]string)
|
||||
err := json.Unmarshal([]byte(volumeID), &volVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
|
||||
}
|
||||
ctxVars, err := GetConnectionParams(volVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volumeCapabilities := req.GetVolumeCapabilities()
|
||||
if (volumeCapabilities == nil)
|
||||
{
|
||||
return nil, status.Error(codes.InvalidArgument, "volumeCapabilities is nil")
|
||||
}
|
||||
|
||||
err = cs.checkCaps(volumeCapabilities, ctxVars["vitastorfs"] != "")
|
||||
err := cs.checkCaps(volumeCapabilities)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
|
@ -344,7 +302,7 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability, fs bool) error
|
||||
func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability) error
|
||||
{
|
||||
var volumeCapabilityAccessModes []*csi.VolumeCapability_AccessMode
|
||||
for _, mode := range []csi.VolumeCapability_AccessMode_Mode{
|
||||
|
@ -360,10 +318,6 @@ func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability
|
|||
{
|
||||
if (capability.GetBlock() != nil)
|
||||
{
|
||||
if (fs)
|
||||
{
|
||||
return status.Errorf(codes.InvalidArgument, "%v not supported with FS-based volumes", capability)
|
||||
}
|
||||
for _, mode := range []csi.VolumeCapability_AccessMode_Mode{
|
||||
csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER,
|
||||
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
|
||||
|
@ -374,12 +328,6 @@ func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability
|
|||
}
|
||||
}
|
||||
|
||||
if (fs)
|
||||
{
|
||||
// All access modes including RWX are supported with FS-based volumes
|
||||
return nil
|
||||
}
|
||||
|
||||
capabilitySupport := false
|
||||
for _, capability := range volumeCapabilities
|
||||
{
|
||||
|
@ -394,7 +342,7 @@ func (cs *ControllerServer) checkCaps(volumeCapabilities []*csi.VolumeCapability
|
|||
|
||||
if (!capabilitySupport)
|
||||
{
|
||||
return status.Errorf(codes.InvalidArgument, "%v not supported", volumeCapabilities)
|
||||
return status.Errorf(codes.NotFound, "%v not supported", volumeCapabilities)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -486,12 +434,6 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
|||
{
|
||||
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
|
||||
}
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
return nil, status.Error(codes.InvalidArgument, "VitastorFS doesn't support snapshots")
|
||||
}
|
||||
|
||||
volName := ctxVars["name"]
|
||||
|
||||
// Create image using vitastor-cli
|
||||
|
@ -550,11 +492,6 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
return nil, status.Error(codes.InvalidArgument, "VitastorFS doesn't support snapshots")
|
||||
}
|
||||
|
||||
_, err = invokeCLI(ctxVars, []string{ "rm", volName+"@"+snapName })
|
||||
if (err != nil)
|
||||
{
|
||||
|
@ -586,11 +523,6 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
return nil, status.Error(codes.InvalidArgument, "VitastorFS doesn't support snapshots")
|
||||
}
|
||||
|
||||
inodeCfg, err := invokeList(ctxVars, volName+"@*", false)
|
||||
if (err != nil)
|
||||
{
|
||||
|
@ -654,16 +586,6 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
// Nothing to change
|
||||
// FIXME: Support quotas and change quota here
|
||||
return &csi.ControllerExpandVolumeResponse{
|
||||
CapacityBytes: req.CapacityRange.RequiredBytes,
|
||||
NodeExpansionRequired: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
inodeCfg, err := invokeList(ctxVars, volName, true)
|
||||
if (err != nil)
|
||||
{
|
||||
|
|
|
@ -5,15 +5,11 @@ package vitastor
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
@ -33,14 +29,13 @@ import (
|
|||
type NodeServer struct
|
||||
{
|
||||
*Driver
|
||||
useVduse bool
|
||||
stateDir string
|
||||
nfsStageDir string
|
||||
mounter mount.Interface
|
||||
useVduse bool
|
||||
stateDir string
|
||||
mounter mount.Interface
|
||||
restartInterval time.Duration
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
volumeLocks map[string]bool
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
volumeLocks map[string]bool
|
||||
}
|
||||
|
||||
type DeviceState struct
|
||||
|
@ -53,15 +48,6 @@ type DeviceState struct
|
|||
PidFile string `json:"pidFile"`
|
||||
}
|
||||
|
||||
type NfsState struct
|
||||
{
|
||||
ConfigPath string `json:"configPath"`
|
||||
FsName string `json:"fsName"`
|
||||
Pool string `json:"pool"`
|
||||
Path string `json:"path"`
|
||||
Port int `json:"port"`
|
||||
}
|
||||
|
||||
// NewNodeServer create new instance node
|
||||
func NewNodeServer(driver *Driver) *NodeServer
|
||||
{
|
||||
|
@ -74,17 +60,11 @@ func NewNodeServer(driver *Driver) *NodeServer
|
|||
{
|
||||
stateDir += "/"
|
||||
}
|
||||
nfsStageDir := os.Getenv("NFS_STAGE_DIR")
|
||||
if (nfsStageDir == "")
|
||||
{
|
||||
nfsStageDir = "/var/lib/kubelet/plugins/csi.vitastor.io/nfs"
|
||||
}
|
||||
ns := &NodeServer{
|
||||
Driver: driver,
|
||||
useVduse: checkVduseSupport(),
|
||||
stateDir: stateDir,
|
||||
nfsStageDir: nfsStageDir,
|
||||
mounter: mount.New(""),
|
||||
Driver: driver,
|
||||
useVduse: checkVduseSupport(),
|
||||
stateDir: stateDir,
|
||||
mounter: mount.New(""),
|
||||
volumeLocks: make(map[string]bool),
|
||||
}
|
||||
ns.cond = sync.NewCond(&ns.mu)
|
||||
|
@ -143,12 +123,12 @@ func (ns *NodeServer) restarter()
|
|||
func (ns *NodeServer) restoreVduseDaemons()
|
||||
{
|
||||
pattern := ns.stateDir+"vitastor-vduse-*.json"
|
||||
stateFiles, err := filepath.Glob(pattern)
|
||||
matches, err := filepath.Glob(pattern)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to list %s: %v", pattern, err)
|
||||
}
|
||||
if (len(stateFiles) == 0)
|
||||
if (len(matches) == 0)
|
||||
{
|
||||
return
|
||||
}
|
||||
|
@ -166,162 +146,59 @@ func (ns *NodeServer) restoreVduseDaemons()
|
|||
klog.Errorf("/sbin/vdpa -j dev list returned bad JSON (error %v): %v", err, string(devListJSON))
|
||||
return
|
||||
}
|
||||
for _, stateFile := range stateFiles
|
||||
for _, stateFile := range matches
|
||||
{
|
||||
ns.checkVduseState(stateFile, devs)
|
||||
}
|
||||
}
|
||||
vdpaId := filepath.Base(stateFile)
|
||||
vdpaId = vdpaId[0:len(vdpaId)-5]
|
||||
// Check if VDPA device is still added to the bus
|
||||
if (devs[vdpaId] == nil)
|
||||
{
|
||||
// Unused, clean it up
|
||||
unmapVduseById(ns.stateDir, vdpaId)
|
||||
continue
|
||||
}
|
||||
|
||||
func (ns *NodeServer) checkVduseState(stateFile string, devs map[string]interface{})
|
||||
{
|
||||
// Check if VDPA device is still added to the bus
|
||||
vdpaId := filepath.Base(stateFile)
|
||||
vdpaId = vdpaId[0:len(vdpaId)-5]
|
||||
if (devs[vdpaId] == nil)
|
||||
{
|
||||
// Unused, clean it up
|
||||
unmapVduseById(ns.stateDir, vdpaId)
|
||||
return
|
||||
}
|
||||
|
||||
// Read state file
|
||||
stateJSON, err := os.ReadFile(stateFile)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("error reading state file %v: %v", stateFile, err)
|
||||
return
|
||||
}
|
||||
var state DeviceState
|
||||
err = json.Unmarshal(stateJSON, &state)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON))
|
||||
return
|
||||
}
|
||||
|
||||
// Lock volume
|
||||
ns.lockVolume(state.ConfigPath+":block:"+state.Image)
|
||||
defer ns.unlockVolume(state.ConfigPath+":block:"+state.Image)
|
||||
|
||||
// Recheck state file after locking
|
||||
_, err = os.ReadFile(stateFile)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("state file %v disappeared, skipping volume", stateFile)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the storage daemon is still active
|
||||
pidFile := ns.stateDir + vdpaId + ".pid"
|
||||
exists := false
|
||||
proc, err := findByPidFile(pidFile)
|
||||
if (err == nil)
|
||||
{
|
||||
exists = proc.Signal(syscall.Signal(0)) == nil
|
||||
}
|
||||
if (!exists)
|
||||
{
|
||||
// Restart daemon
|
||||
klog.Warningf("restarting storage daemon for volume %v (VDPA ID %v)", state.Image, vdpaId)
|
||||
err = startStorageDaemon(vdpaId, state.Image, pidFile, state.ConfigPath, state.Readonly)
|
||||
stateJSON, err := os.ReadFile(stateFile)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("failed to restart storage daemon for volume %v: %v", state.Image, err)
|
||||
klog.Warningf("error reading state file %v: %v", stateFile, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ns *NodeServer) restoreNfsDaemons()
|
||||
{
|
||||
pattern := ns.stateDir+"vitastor-nfs-*.json"
|
||||
stateFiles, err := filepath.Glob(pattern)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to list %s: %v", pattern, err)
|
||||
}
|
||||
if (len(stateFiles) == 0)
|
||||
{
|
||||
return
|
||||
}
|
||||
activeNFS, err := ns.listActiveNFS()
|
||||
if (err != nil)
|
||||
{
|
||||
return
|
||||
}
|
||||
// Check all state files and try to restore active mounts
|
||||
for _, stateFile := range stateFiles
|
||||
{
|
||||
ns.checkNfsState(stateFile, activeNFS)
|
||||
}
|
||||
}
|
||||
|
||||
func (ns *NodeServer) readNfsState(stateFile string, allowNotExists bool) (*NfsState, error)
|
||||
{
|
||||
stateJSON, err := os.ReadFile(stateFile)
|
||||
if (err != nil)
|
||||
{
|
||||
if (allowNotExists && os.IsNotExist(err))
|
||||
{
|
||||
return nil, nil
|
||||
}
|
||||
klog.Warningf("error reading state file %v: %v", stateFile, err)
|
||||
return nil, err
|
||||
}
|
||||
var state NfsState
|
||||
err = json.Unmarshal(stateJSON, &state)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON))
|
||||
return nil, err
|
||||
}
|
||||
return &state, nil
|
||||
}
|
||||
|
||||
func (ns *NodeServer) checkNfsState(stateFile string, activeNfs map[int][]string)
|
||||
{
|
||||
// Read state file
|
||||
state, err := ns.readNfsState(stateFile, false)
|
||||
if (err != nil)
|
||||
{
|
||||
return
|
||||
}
|
||||
// Lock FS
|
||||
ns.lockVolume(state.ConfigPath+":fs:"+state.FsName)
|
||||
defer ns.unlockVolume(state.ConfigPath+":fs:"+state.FsName)
|
||||
// Check if NFS at this port is still mounted
|
||||
pidFile := ns.stateDir + filepath.Base(stateFile)
|
||||
pidFile = pidFile[0:len(pidFile)-5] + ".pid"
|
||||
if (len(activeNfs[state.Port]) == 0)
|
||||
{
|
||||
// this is a stale state file, remove it
|
||||
klog.Warningf("state file %v contains stale mount at port %d, removing it", stateFile, state.Port)
|
||||
ns.stopNFS(stateFile, pidFile)
|
||||
return
|
||||
}
|
||||
// Check PID file
|
||||
exists := false
|
||||
proc, err := findByPidFile(pidFile)
|
||||
if (err == nil)
|
||||
{
|
||||
exists = proc.Signal(syscall.Signal(0)) == nil
|
||||
}
|
||||
if (!exists)
|
||||
{
|
||||
// Restart vitastor-nfs server
|
||||
klog.Warningf("restarting NFS server for FS %v at port %v", state.FsName, state.Port)
|
||||
_, _, err := system(
|
||||
"/usr/bin/vitastor-nfs", "start",
|
||||
"--pidfile", pidFile,
|
||||
"--bind", "127.0.0.1",
|
||||
"--port", fmt.Sprintf("%d", state.Port),
|
||||
"--fs", state.FsName,
|
||||
"--pool", state.Pool,
|
||||
"--portmap", "0",
|
||||
)
|
||||
var state DeviceState
|
||||
err = json.Unmarshal(stateJSON, &state)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("failed to restart NFS server for FS %v: %v", state.FsName, err)
|
||||
klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON))
|
||||
continue
|
||||
}
|
||||
|
||||
ns.lockVolume(state.ConfigPath+":"+state.Image)
|
||||
|
||||
// Recheck state file after locking
|
||||
_, err = os.ReadFile(stateFile)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("state file %v disappeared, skipping volume", stateFile)
|
||||
ns.unlockVolume(state.ConfigPath+":"+state.Image)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the storage daemon is still active
|
||||
pidFile := ns.stateDir + vdpaId + ".pid"
|
||||
exists := false
|
||||
proc, err := findByPidFile(pidFile)
|
||||
if (err == nil)
|
||||
{
|
||||
exists = proc.Signal(syscall.Signal(0)) == nil
|
||||
}
|
||||
if (!exists)
|
||||
{
|
||||
// Restart daemon
|
||||
klog.Warningf("restarting storage daemon for volume %v (VDPA ID %v)", state.Image, vdpaId)
|
||||
_ = startStorageDaemon(vdpaId, state.Image, pidFile, state.ConfigPath, state.Readonly)
|
||||
}
|
||||
|
||||
ns.unlockVolume(state.ConfigPath+":"+state.Image)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -343,13 +220,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
|||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
ns.lockVolume(ctxVars["configPath"]+":block:"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":block:"+volName)
|
||||
ns.lockVolume(ctxVars["configPath"]+":"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
|
||||
|
||||
targetPath := req.GetStagingTargetPath()
|
||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||
|
@ -536,13 +408,8 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
|||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
ns.lockVolume(ctxVars["configPath"]+":block:"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":block:"+volName)
|
||||
ns.lockVolume(ctxVars["configPath"]+":"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
|
||||
|
||||
targetPath := req.GetStagingTargetPath()
|
||||
devicePath, _, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath)
|
||||
|
@ -595,153 +462,6 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
|||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// Mount or check if NFS is already mounted
|
||||
func (ns *NodeServer) mountNFS(ctxVars map[string]string) (string, error)
|
||||
{
|
||||
sum := sha1.Sum([]byte(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"]))
|
||||
nfsHash := hex.EncodeToString(sum[:])
|
||||
stateFile := ns.stateDir+"vitastor-nfs-"+nfsHash+".json"
|
||||
pidFile := ns.stateDir+"vitastor-nfs-"+nfsHash+".pid"
|
||||
mountPath := ns.nfsStageDir+"/"+nfsHash
|
||||
state, err := ns.readNfsState(stateFile, true)
|
||||
if (state != nil)
|
||||
{
|
||||
return state.Path, nil
|
||||
}
|
||||
if (err != nil)
|
||||
{
|
||||
return "", err
|
||||
}
|
||||
err = os.MkdirAll(mountPath, 0777)
|
||||
if (err != nil)
|
||||
{
|
||||
return "", err
|
||||
}
|
||||
// Create a new mount
|
||||
state = &NfsState{
|
||||
ConfigPath: ctxVars["configPath"],
|
||||
FsName: ctxVars["vitastorfs"],
|
||||
Pool: ctxVars["pool"],
|
||||
Path: mountPath,
|
||||
}
|
||||
klog.Infof("starting new NFS server for FS %v", state.FsName)
|
||||
stdout, _, err := system(
|
||||
"/usr/bin/vitastor-nfs", "start",
|
||||
"--pidfile", pidFile,
|
||||
"--bind", "127.0.0.1",
|
||||
"--port", "auto",
|
||||
"--fs", state.FsName,
|
||||
"--pool", state.Pool,
|
||||
"--portmap", "0",
|
||||
)
|
||||
if (err != nil)
|
||||
{
|
||||
return "", err
|
||||
}
|
||||
match := regexp.MustCompile("Port: (\\d+)").FindStringSubmatch(string(stdout))
|
||||
if (match == nil)
|
||||
{
|
||||
klog.Errorf("failed to find port in vitastor-nfs output: %v", string(stdout))
|
||||
ns.stopNFS(stateFile, pidFile)
|
||||
return "", fmt.Errorf("failed to find port in vitastor-nfs output (bad vitastor-nfs version?)")
|
||||
}
|
||||
port, _ := strconv.ParseUint(match[1], 0, 16)
|
||||
state.Port = int(port)
|
||||
// Write state file
|
||||
stateJSON, _ := json.Marshal(state)
|
||||
err = os.WriteFile(stateFile, stateJSON, 0600)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to write state file %v", stateFile)
|
||||
ns.stopNFS(stateFile, pidFile)
|
||||
return "", err
|
||||
}
|
||||
// Mount NFS
|
||||
_, _, err = system(
|
||||
"mount", "-t", "nfs", "127.0.0.1:/", state.Path,
|
||||
"-o", fmt.Sprintf("port=%d,mountport=%d,nfsvers=3,soft,nolock,tcp", port, port),
|
||||
)
|
||||
if (err != nil)
|
||||
{
|
||||
ns.stopNFS(stateFile, pidFile)
|
||||
return "", err
|
||||
}
|
||||
return state.Path, nil
|
||||
}
|
||||
|
||||
// Mount or check if NFS is already mounted
|
||||
func (ns *NodeServer) checkStopNFS(ctxVars map[string]string)
|
||||
{
|
||||
sum := sha1.Sum([]byte(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"]))
|
||||
nfsHash := hex.EncodeToString(sum[:])
|
||||
stateFile := ns.stateDir+"vitastor-nfs-"+nfsHash+".json"
|
||||
pidFile := ns.stateDir+"vitastor-nfs-"+nfsHash+".pid"
|
||||
mountPath := ns.nfsStageDir+"/"+nfsHash
|
||||
state, err := ns.readNfsState(stateFile, true)
|
||||
if (state == nil)
|
||||
{
|
||||
return
|
||||
}
|
||||
activeNFS, err := ns.listActiveNFS()
|
||||
if (err != nil)
|
||||
{
|
||||
return
|
||||
}
|
||||
if (len(activeNFS[state.Port]) > 0)
|
||||
{
|
||||
return
|
||||
}
|
||||
// All volume mounts are detached, unmount the root mount and kill the server
|
||||
err = mount.CleanupMountPoint(mountPath, ns.mounter, false)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to unmount %v: %v", mountPath, err)
|
||||
return
|
||||
}
|
||||
ns.stopNFS(stateFile, pidFile)
|
||||
}
|
||||
|
||||
func (ns *NodeServer) stopNFS(stateFile, pidFile string)
|
||||
{
|
||||
err := killByPidFile(pidFile)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to kill process with pid from %v: %v", pidFile, err)
|
||||
}
|
||||
os.Remove(pidFile)
|
||||
os.Remove(stateFile)
|
||||
}
|
||||
|
||||
func (ns *NodeServer) listActiveNFS() (map[int][]string, error)
|
||||
{
|
||||
mounts, err := mount.ParseMountInfo("/proc/self/mountinfo")
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to list mounts: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
activeNFS := make(map[int][]string)
|
||||
for _, mount := range mounts
|
||||
{
|
||||
// Volume mounts always refer to subpaths
|
||||
if (mount.FsType == "nfs" && mount.Root != "/")
|
||||
{
|
||||
for _, opt := range mount.MountOptions
|
||||
{
|
||||
if (strings.HasPrefix(opt, "port="))
|
||||
{
|
||||
port64, err := strconv.ParseUint(opt[5:], 10, 16)
|
||||
if (err == nil)
|
||||
{
|
||||
activeNFS[int(port64)] = append(activeNFS[int(port64)], mount.MountPoint)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return activeNFS, nil
|
||||
}
|
||||
|
||||
// NodePublishVolume mounts the volume mounted to the staging path to the target path
|
||||
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error)
|
||||
{
|
||||
|
@ -760,39 +480,28 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
ns.lockVolume(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"])
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"])
|
||||
}
|
||||
else
|
||||
{
|
||||
ns.lockVolume(ctxVars["configPath"]+":block:"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":block:"+volName)
|
||||
}
|
||||
ns.lockVolume(ctxVars["configPath"]+":"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
|
||||
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
targetPath := req.GetTargetPath()
|
||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||
|
||||
if (ctxVars["vitastorfs"] == "")
|
||||
// Check that stagingTargetPath is mounted
|
||||
notmnt, err := mount.IsNotMountPoint(ns.mounter, stagingTargetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
// Check that stagingTargetPath is mounted
|
||||
notmnt, err := mount.IsNotMountPoint(ns.mounter, stagingTargetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("staging path %v is not mounted: %w", stagingTargetPath, err)
|
||||
return nil, fmt.Errorf("staging path %v is not mounted: %w", stagingTargetPath, err)
|
||||
}
|
||||
else if (notmnt)
|
||||
{
|
||||
klog.Errorf("staging path %v is not mounted", stagingTargetPath)
|
||||
return nil, fmt.Errorf("staging path %v is not mounted", stagingTargetPath)
|
||||
}
|
||||
klog.Errorf("staging path %v is not mounted: %w", stagingTargetPath, err)
|
||||
return nil, fmt.Errorf("staging path %v is not mounted: %w", stagingTargetPath, err)
|
||||
}
|
||||
else if (notmnt)
|
||||
{
|
||||
klog.Errorf("staging path %v is not mounted", stagingTargetPath)
|
||||
return nil, fmt.Errorf("staging path %v is not mounted", stagingTargetPath)
|
||||
}
|
||||
|
||||
// Check that targetPath is not already mounted
|
||||
notmnt, err := mount.IsNotMountPoint(ns.mounter, targetPath)
|
||||
notmnt, err = mount.IsNotMountPoint(ns.mounter, targetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
if (os.IsNotExist(err))
|
||||
|
@ -833,24 +542,6 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
return nil, fmt.Errorf("target path %s is already mounted", targetPath)
|
||||
}
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
nfspath, err := ns.mountNFS(ctxVars)
|
||||
if (err != nil)
|
||||
{
|
||||
ns.checkStopNFS(ctxVars)
|
||||
return nil, err
|
||||
}
|
||||
// volName should include prefix
|
||||
stagingTargetPath = nfspath+"/"+volName
|
||||
err = os.MkdirAll(stagingTargetPath, 0777)
|
||||
if (err != nil && !os.IsExist(err))
|
||||
{
|
||||
ns.checkStopNFS(ctxVars)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
execArgs := []string{"--bind", stagingTargetPath, targetPath}
|
||||
if (req.GetReadonly())
|
||||
{
|
||||
|
@ -862,10 +553,6 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
out, err := cmd.Output()
|
||||
if (err != nil)
|
||||
{
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
ns.checkStopNFS(ctxVars)
|
||||
}
|
||||
return nil, fmt.Errorf("Error running mount %v: %s", strings.Join(execArgs, " "), out)
|
||||
}
|
||||
|
||||
|
@ -885,16 +572,8 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
ns.lockVolume(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"])
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":fs:"+ctxVars["vitastorfs"])
|
||||
}
|
||||
else
|
||||
{
|
||||
ns.lockVolume(ctxVars["configPath"]+":block:"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":block:"+volName)
|
||||
}
|
||||
ns.lockVolume(ctxVars["configPath"]+":"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
devicePath, _, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath)
|
||||
|
@ -921,11 +600,6 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if (ctxVars["vitastorfs"] != "")
|
||||
{
|
||||
ns.checkStopNFS(ctxVars)
|
||||
}
|
||||
|
||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -61,10 +61,6 @@ pkg_check_modules(ISAL libisal)
|
|||
if (ISAL_LIBRARIES)
|
||||
add_definitions(-DWITH_ISAL)
|
||||
endif (ISAL_LIBRARIES)
|
||||
pkg_check_modules(RDMACM librdmacm)
|
||||
if (RDMACM_LIBRARIES)
|
||||
add_definitions(-DWITH_RDMACM)
|
||||
endif (RDMACM_LIBRARIES)
|
||||
|
||||
add_custom_target(build_tests)
|
||||
add_custom_target(test
|
||||
|
|
|
@ -5,7 +5,6 @@ project(vitastor)
|
|||
# vitastor-nfs
|
||||
add_executable(vitastor-nfs
|
||||
nfs_proxy.cpp
|
||||
nfs_proxy_rdma.cpp
|
||||
nfs_block.cpp
|
||||
nfs_kv.cpp
|
||||
nfs_kv_create.cpp
|
||||
|
@ -22,10 +21,8 @@ add_executable(vitastor-nfs
|
|||
nfs_fsstat.cpp
|
||||
nfs_mount.cpp
|
||||
nfs_portmap.cpp
|
||||
rdma_alloc.cpp
|
||||
../util/sha256.c
|
||||
proto/xdr_impl.cpp
|
||||
proto/rpc_rdma_xdr.cpp
|
||||
proto/rpc_xdr.cpp
|
||||
proto/portmap_xdr.cpp
|
||||
proto/nfs_xdr.cpp
|
||||
|
@ -33,5 +30,4 @@ add_executable(vitastor-nfs
|
|||
target_link_libraries(vitastor-nfs
|
||||
vitastor_client
|
||||
vitastor_kv
|
||||
${RDMACM_LIBRARIES}
|
||||
)
|
||||
|
|
|
@ -315,7 +315,8 @@ static int block_nfs3_read_proc(void *opaque, rpc_op_t *rop)
|
|||
if (aligned_count % alignment)
|
||||
aligned_count = aligned_count + alignment - (aligned_count % alignment);
|
||||
aligned_count -= aligned_offset;
|
||||
void *buf = self->malloc_or_rdma(rop, aligned_count);
|
||||
void *buf = malloc_or_die(aligned_count);
|
||||
xdr_add_malloc(rop->xdrs, buf);
|
||||
cluster_op_t *op = new cluster_op_t;
|
||||
op->opcode = OSD_OP_READ;
|
||||
op->inode = ino_it->second;
|
||||
|
@ -334,22 +335,10 @@ static int block_nfs3_read_proc(void *opaque, rpc_op_t *rop)
|
|||
}
|
||||
else
|
||||
{
|
||||
nfs_client_t *self = (nfs_client_t*)rop->client;
|
||||
auto & reply_ok = reply->resok;
|
||||
// reply_ok.data.data is already set above
|
||||
reply_ok.count = reply_ok.data.size;
|
||||
reply_ok.eof = 0;
|
||||
if (self->rdma_conn)
|
||||
{
|
||||
// FIXME Linux NFS RDMA transport has a bug - when the reply
|
||||
// doesn't contain post_op_attr, the data gets offsetted by
|
||||
// 84 bytes (size of attributes)...
|
||||
// So we have to fill it with RDMA. :-(
|
||||
reply_ok.file_attributes = (post_op_attr){
|
||||
.attributes_follow = 1,
|
||||
.attributes = get_file_attributes(self, op->inode),
|
||||
};
|
||||
}
|
||||
}
|
||||
rpc_queue_reply(rop);
|
||||
delete op;
|
||||
|
|
|
@ -65,7 +65,7 @@ int kv_map_type(const std::string & type)
|
|||
(type == "fifo" ? NF3FIFO : -1)))))));
|
||||
}
|
||||
|
||||
fattr3 get_kv_attributes(nfs_proxy_t *proxy, uint64_t ino, json11::Json attrs)
|
||||
fattr3 get_kv_attributes(nfs_client_t *self, uint64_t ino, json11::Json attrs)
|
||||
{
|
||||
auto type = kv_map_type(attrs["type"].string_value());
|
||||
auto mode = attrs["mode"].uint64_value();
|
||||
|
@ -86,7 +86,7 @@ fattr3 get_kv_attributes(nfs_proxy_t *proxy, uint64_t ino, json11::Json attrs)
|
|||
.rdev = (type == NF3BLK || type == NF3CHR
|
||||
? (specdata3){ (uint32_t)attrs["major"].uint64_value(), (uint32_t)attrs["minor"].uint64_value() }
|
||||
: (specdata3){}),
|
||||
.fsid = proxy->fsid,
|
||||
.fsid = self->parent->fsid,
|
||||
.fileid = ino,
|
||||
.atime = atime,
|
||||
.mtime = mtime,
|
||||
|
@ -349,27 +349,6 @@ kv_fs_state_t::~kv_fs_state_t()
|
|||
}
|
||||
}
|
||||
|
||||
void kv_fs_state_t::write_inode(inode_t ino, json11::Json value, bool hack_cache, std::function<void(int)> cb, std::function<bool(int, const std::string &)> cas_cb)
|
||||
{
|
||||
if (!proxy->rdma_context)
|
||||
{
|
||||
proxy->db->set(kv_inode_key(ino), value.dump(), cb, cas_cb);
|
||||
return;
|
||||
}
|
||||
// FIXME Linux NFS RDMA transport has a bug - it corrupts the data (by offsetting it 84 bytes)
|
||||
// when the READ reply doesn't contain post_op_attr. So we have to fill post_op_attr with RDMA. :-(
|
||||
// So we at least cache it to not repeat K/V requests every read.
|
||||
read_hack_cache.erase(ino);
|
||||
proxy->db->set(kv_inode_key(ino), value.dump(), [=](int res)
|
||||
{
|
||||
if (hack_cache || res != 0)
|
||||
read_hack_cache.erase(ino);
|
||||
else
|
||||
read_hack_cache[ino] = value;
|
||||
cb(res);
|
||||
}, cas_cb);
|
||||
}
|
||||
|
||||
void kv_fs_state_t::update_inode(inode_t ino, bool allow_cache, std::function<void(json11::Json::object &)> change, std::function<void(int)> cb)
|
||||
{
|
||||
// FIXME: Use "update" query
|
||||
|
@ -377,15 +356,12 @@ void kv_fs_state_t::update_inode(inode_t ino, bool allow_cache, std::function<vo
|
|||
{
|
||||
if (!res)
|
||||
{
|
||||
read_hack_cache.erase(ino);
|
||||
auto ientry = attrs.object_items();
|
||||
change(ientry);
|
||||
bool *found = new bool;
|
||||
*found = true;
|
||||
json11::Json ientry_json(ientry);
|
||||
proxy->db->set(kv_inode_key(ino), ientry_json.dump(), [=](int res)
|
||||
proxy->db->set(kv_inode_key(ino), json11::Json(ientry).dump(), [=](int res)
|
||||
{
|
||||
read_hack_cache.erase(ino);
|
||||
if (!*found)
|
||||
res = -ENOENT;
|
||||
delete found;
|
||||
|
@ -408,8 +384,6 @@ void kv_fs_state_t::update_inode(inode_t ino, bool allow_cache, std::function<vo
|
|||
|
||||
void kv_fs_state_t::touch_inodes()
|
||||
{
|
||||
// Clear RDMA read fattr3 "hack" cache every second
|
||||
read_hack_cache.clear();
|
||||
std::set<inode_t> q = std::move(touch_queue);
|
||||
for (auto ino: q)
|
||||
{
|
||||
|
|
|
@ -75,7 +75,6 @@ struct kv_fs_state_t
|
|||
std::map<inode_t, kv_inode_extend_t> extends;
|
||||
std::set<inode_t> touch_queue;
|
||||
std::map<inode_t, uint64_t> volume_removed;
|
||||
std::map<inode_t, json11::Json> read_hack_cache;
|
||||
uint64_t volume_stats_ctr = 0;
|
||||
uint64_t volume_touch_ctr = 0;
|
||||
|
||||
|
@ -88,7 +87,6 @@ struct kv_fs_state_t
|
|||
void upgrade_db(std::function<void(int)> cb);
|
||||
void defrag_all(json11::Json cfg, std::function<void(int)> cb);
|
||||
void defrag_volume(inode_t ino, bool no_rm, bool dry_run, std::function<void(int, uint64_t, uint64_t, uint64_t)> cb);
|
||||
void write_inode(inode_t ino, json11::Json value, bool hack_cache, std::function<void(int)> cb, std::function<bool(int, const std::string &)> cas_cb);
|
||||
~kv_fs_state_t();
|
||||
};
|
||||
|
||||
|
@ -118,7 +116,7 @@ nfstime3 nfstime_from_str(const std::string & s);
|
|||
std::string nfstime_to_str(nfstime3 t);
|
||||
std::string nfstime_now_str();
|
||||
int kv_map_type(const std::string & type);
|
||||
fattr3 get_kv_attributes(nfs_proxy_t *proxy, uint64_t ino, json11::Json attrs);
|
||||
fattr3 get_kv_attributes(nfs_client_t *self, uint64_t ino, json11::Json attrs);
|
||||
std::string kv_direntry_key(uint64_t dir_ino, const std::string & filename);
|
||||
std::string kv_direntry_filename(const std::string & key);
|
||||
std::string kv_inode_prefix_key(uint64_t ino, const char *prefix);
|
||||
|
|
|
@ -143,7 +143,7 @@ resume_2:
|
|||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
st->self->parent->kvfs->write_inode(st->new_id, st->attrs, false, [st](int res)
|
||||
st->self->parent->db->set(kv_inode_key(st->new_id), st->attrs.dump().c_str(), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
kv_continue_create(st, 3);
|
||||
|
@ -267,7 +267,7 @@ template<class T, class Tok> static void kv_create_reply(kv_create_state *st, in
|
|||
},
|
||||
.obj_attributes = {
|
||||
.attributes_follow = 1,
|
||||
.attributes = get_kv_attributes(st->self->parent, st->new_id, st->attrs),
|
||||
.attributes = get_kv_attributes(st->self, st->new_id, st->attrs),
|
||||
},
|
||||
},
|
||||
};
|
||||
|
|
|
@ -68,7 +68,7 @@ int kv_nfs3_getattr_proc(void *opaque, rpc_op_t *rop)
|
|||
*reply = (GETATTR3res){
|
||||
.status = NFS3_OK,
|
||||
.resok = (GETATTR3resok){
|
||||
.obj_attributes = get_kv_attributes(self->parent, ino, attrs),
|
||||
.obj_attributes = get_kv_attributes(self, ino, attrs),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
@ -102,7 +102,7 @@ resume_2:
|
|||
new_ientry["ctime"] = nfstime_now_str();
|
||||
st->ientry = new_ientry;
|
||||
}
|
||||
st->self->parent->kvfs->write_inode(st->ino, st->ientry, false, [st](int res)
|
||||
st->self->parent->db->set(kv_inode_key(st->ino), st->ientry.dump(), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_link(st, 3);
|
||||
|
@ -180,7 +180,7 @@ int kv_nfs3_link_proc(void *opaque, rpc_op_t *rop)
|
|||
.resok = (LINK3resok){
|
||||
.file_attributes = (post_op_attr){
|
||||
.attributes_follow = 1,
|
||||
.attributes = get_kv_attributes(st->self->parent, st->ino, st->ientry),
|
||||
.attributes = get_kv_attributes(st->self, st->ino, st->ientry),
|
||||
},
|
||||
},
|
||||
};
|
||||
|
|
|
@ -55,7 +55,7 @@ int kv_nfs3_lookup_proc(void *opaque, rpc_op_t *rop)
|
|||
.object = xdr_copy_string(rop->xdrs, kv_fh(ino)),
|
||||
.obj_attributes = {
|
||||
.attributes_follow = 1,
|
||||
.attributes = get_kv_attributes(self->parent, ino, ientry),
|
||||
.attributes = get_kv_attributes(self, ino, ientry),
|
||||
},
|
||||
},
|
||||
};
|
||||
|
@ -91,14 +91,10 @@ int kv_nfs3_readlink_proc(void *opaque, rpc_op_t *rop)
|
|||
}
|
||||
else
|
||||
{
|
||||
std::string link_target = attrs["symlink"].string_value();
|
||||
char *cp = (char*)self->malloc_or_rdma(rop, link_target.size()+1);
|
||||
memcpy(cp, link_target.data(), link_target.size());
|
||||
cp[link_target.size()] = 0;
|
||||
*reply = (READLINK3res){
|
||||
.status = NFS3_OK,
|
||||
.resok = (READLINK3resok){
|
||||
.data = (xdr_string_t){ link_target.size(), cp },
|
||||
.data = xdr_copy_string(rop->xdrs, attrs["symlink"].string_value()),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
@ -37,7 +37,6 @@ static void nfs_kv_continue_read(nfs_kv_read_state *st, int state)
|
|||
else if (state == 1) goto resume_1;
|
||||
else if (state == 2) goto resume_2;
|
||||
else if (state == 3) goto resume_3;
|
||||
else if (state == 4) goto resume_4;
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "BUG: invalid state in nfs_kv_continue_read()");
|
||||
|
@ -97,7 +96,7 @@ resume_1:
|
|||
}
|
||||
read_size += sizeof(shared_file_header_t);
|
||||
assert(!st->aligned_buf);
|
||||
st->aligned_buf = (uint8_t*)st->self->malloc_or_rdma(st->rop, read_size);
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(read_size);
|
||||
st->buf = st->aligned_buf + sizeof(shared_file_header_t) + st->offset;
|
||||
st->op->iov.push_back(st->aligned_buf, read_size);
|
||||
st->op->len = align_up(read_offset+read_size) - st->op->offset;
|
||||
|
@ -118,7 +117,7 @@ resume_1:
|
|||
resume_2:
|
||||
if (st->res < 0)
|
||||
{
|
||||
st->self->free_or_rdma(st->rop, st->aligned_buf);
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res);
|
||||
|
@ -132,7 +131,7 @@ resume_2:
|
|||
" 0x%jx offset 0x%jx: probably a read/write conflict, retrying\n",
|
||||
st->ino, st->ientry["shared_ino"].uint64_value(), st->ientry["shared_offset"].uint64_value());
|
||||
st->retry++;
|
||||
st->self->free_or_rdma(st->rop, st->aligned_buf);
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
st->allow_cache = false;
|
||||
goto resume_0;
|
||||
|
@ -142,39 +141,10 @@ resume_2:
|
|||
return;
|
||||
}
|
||||
}
|
||||
else if (st->self->rdma_conn)
|
||||
{
|
||||
// Take ientry from read_hack_cache for RDMA connections
|
||||
{
|
||||
auto rh_it = st->self->parent->kvfs->read_hack_cache.find(st->ino);
|
||||
if (rh_it != st->self->parent->kvfs->read_hack_cache.end())
|
||||
{
|
||||
st->ientry = rh_it->second;
|
||||
}
|
||||
}
|
||||
if (st->ientry.is_null())
|
||||
{
|
||||
kv_read_inode(st->self->parent, st->ino, [st](int res, const std::string & value, json11::Json attrs)
|
||||
{
|
||||
st->res = res;
|
||||
st->ientry = attrs;
|
||||
nfs_kv_continue_read(st, 4);
|
||||
}, st->allow_cache);
|
||||
return;
|
||||
resume_4:
|
||||
if (st->res < 0 || kv_map_type(st->ientry["type"].string_value()) != NF3REG)
|
||||
{
|
||||
auto cb = std::move(st->cb);
|
||||
cb(st->res < 0 ? st->res : -EINVAL);
|
||||
return;
|
||||
}
|
||||
st->self->parent->kvfs->read_hack_cache[st->ino] = st->ientry;
|
||||
}
|
||||
}
|
||||
st->aligned_offset = align_down(st->offset);
|
||||
st->aligned_size = align_up(st->offset+st->size) - st->aligned_offset;
|
||||
assert(!st->aligned_buf);
|
||||
st->aligned_buf = (uint8_t*)st->self->malloc_or_rdma(st->rop, st->aligned_size);
|
||||
st->aligned_buf = (uint8_t*)malloc_or_die(st->aligned_size);
|
||||
st->buf = st->aligned_buf + st->offset - st->aligned_offset;
|
||||
st->op = new cluster_op_t;
|
||||
st->op->opcode = OSD_OP_READ;
|
||||
|
@ -193,7 +163,7 @@ resume_4:
|
|||
resume_3:
|
||||
if (st->res < 0)
|
||||
{
|
||||
st->self->free_or_rdma(st->rop, st->aligned_buf);
|
||||
free(st->aligned_buf);
|
||||
st->aligned_buf = NULL;
|
||||
}
|
||||
auto cb = std::move(st->cb);
|
||||
|
@ -224,21 +194,11 @@ int kv_nfs3_read_proc(void *opaque, rpc_op_t *rop)
|
|||
*reply = (READ3res){ .status = vitastor_nfs_map_err(res) };
|
||||
if (res == 0)
|
||||
{
|
||||
xdr_add_malloc(st->rop->xdrs, st->aligned_buf);
|
||||
reply->resok.data.data = (char*)st->buf;
|
||||
reply->resok.data.size = st->size;
|
||||
reply->resok.count = st->size;
|
||||
reply->resok.eof = st->eof;
|
||||
if (st->self->rdma_conn)
|
||||
{
|
||||
// FIXME Linux NFS RDMA transport has a bug - when the reply
|
||||
// doesn't contain post_op_attr, the data gets offsetted by
|
||||
// 84 bytes (size of attributes)...
|
||||
// So we have to fill it with RDMA. :-(
|
||||
reply->resok.file_attributes = (post_op_attr){
|
||||
.attributes_follow = 1,
|
||||
.attributes = get_kv_attributes(st->self->parent, st->ino, st->ientry),
|
||||
};
|
||||
}
|
||||
}
|
||||
rpc_queue_reply(st->rop);
|
||||
delete st;
|
||||
|
|
|
@ -57,7 +57,7 @@ static void kv_getattr_next(nfs_kv_readdir_state *st)
|
|||
st->entries[idx].name_attributes = (post_op_attr){
|
||||
// FIXME: maybe do not read parent attributes and leave them to a GETATTR?
|
||||
.attributes_follow = 1,
|
||||
.attributes = get_kv_attributes(st->self->parent, st->entries[idx].fileid, ientry),
|
||||
.attributes = get_kv_attributes(st->self, st->entries[idx].fileid, ientry),
|
||||
};
|
||||
}
|
||||
st->getattr_running--;
|
||||
|
@ -126,7 +126,7 @@ resume_1:
|
|||
dot.fileid = st->dir_ino;
|
||||
dot.name_attributes = (post_op_attr){
|
||||
.attributes_follow = 1,
|
||||
.attributes = get_kv_attributes(st->self->parent, st->dir_ino, st->ientry),
|
||||
.attributes = get_kv_attributes(st->self, st->dir_ino, st->ientry),
|
||||
};
|
||||
dot.name_handle = (post_op_fh3){
|
||||
.handle_follows = 1,
|
||||
|
@ -169,7 +169,7 @@ resume_2:
|
|||
dotdot.name_attributes = (post_op_attr){
|
||||
// FIXME: maybe do not read parent attributes and leave them to a GETATTR?
|
||||
.attributes_follow = 1,
|
||||
.attributes = get_kv_attributes(st->self->parent,
|
||||
.attributes = get_kv_attributes(st->self,
|
||||
st->parent_ino ? st->parent_ino : st->dir_ino,
|
||||
st->parent_ino ? st->parent_ientry : st->ientry),
|
||||
};
|
||||
|
|
|
@ -197,7 +197,7 @@ resume_5:
|
|||
auto copy = st->ientry.object_items();
|
||||
copy["nlink"] = st->ientry["nlink"].uint64_value()-1;
|
||||
copy["ctime"] = nfstime_now_str();
|
||||
st->self->parent->kvfs->write_inode(st->ino, copy, false, [st](int res)
|
||||
st->self->parent->db->set(kv_inode_key(st->ino), json11::Json(copy).dump(), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_delete(st, 6);
|
||||
|
|
|
@ -240,7 +240,7 @@ resume_7:
|
|||
copy["nlink"] = st->new_ientry["nlink"].uint64_value()-1;
|
||||
copy["ctime"] = nfstime_now_str();
|
||||
copy.erase("verf");
|
||||
st->self->parent->kvfs->write_inode(st->new_direntry["ino"].uint64_value(), copy, false, [st](int res)
|
||||
st->self->parent->db->set(kv_inode_key(st->new_direntry["ino"].uint64_value()), json11::Json(copy).dump(), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_rename(st, 8);
|
||||
|
@ -328,7 +328,7 @@ resume_11:
|
|||
ientry_new["parent_ino"] = st->new_dir_ino;
|
||||
ientry_new["ctime"] = nfstime_now_str();
|
||||
ientry_new.erase("verf");
|
||||
st->self->parent->kvfs->write_inode(st->old_direntry["ino"].uint64_value(), ientry_new, false, [st](int res)
|
||||
st->self->parent->db->set(kv_inode_key(st->old_direntry["ino"].uint64_value()), json11::Json(ientry_new).dump(), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_rename(st, 12);
|
||||
|
|
|
@ -84,7 +84,7 @@ resume_1:
|
|||
}
|
||||
st->new_attrs.erase("verf");
|
||||
st->new_attrs["ctime"] = nfstime_now_str();
|
||||
st->self->parent->kvfs->write_inode(st->ino, st->new_attrs, false, [st](int res)
|
||||
st->self->parent->db->set(kv_inode_key(st->ino), json11::Json(st->new_attrs).dump(), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_setattr(st, 2);
|
||||
|
@ -190,7 +190,7 @@ int kv_nfs3_setattr_proc(void *opaque, rpc_op_t *rop)
|
|||
.obj_wcc = (wcc_data){
|
||||
.after = (post_op_attr){
|
||||
.attributes_follow = 1,
|
||||
.attributes = get_kv_attributes(st->self->parent, st->ino, st->new_attrs),
|
||||
.attributes = get_kv_attributes(st->self, st->ino, st->new_attrs),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -553,7 +553,7 @@ static void nfs_do_align_write(nfs_kv_write_state *st, uint64_t ino, uint64_t of
|
|||
}
|
||||
}
|
||||
|
||||
static json11::Json new_normal_ientry(nfs_kv_write_state *st)
|
||||
static std::string new_normal_ientry(nfs_kv_write_state *st)
|
||||
{
|
||||
auto ni = st->ientry.object_items();
|
||||
ni.erase("empty");
|
||||
|
@ -564,10 +564,10 @@ static json11::Json new_normal_ientry(nfs_kv_write_state *st)
|
|||
ni["size"] = st->ext->cur_extend;
|
||||
ni["ctime"] = ni["mtime"] = nfstime_now_str();
|
||||
ni.erase("verf");
|
||||
return ni;
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
|
||||
static json11::Json new_moved_ientry(nfs_kv_write_state *st)
|
||||
static std::string new_moved_ientry(nfs_kv_write_state *st)
|
||||
{
|
||||
auto ni = st->ientry.object_items();
|
||||
ni.erase("empty");
|
||||
|
@ -578,10 +578,10 @@ static json11::Json new_moved_ientry(nfs_kv_write_state *st)
|
|||
ni["size"] = st->new_size;
|
||||
ni["ctime"] = ni["mtime"] = nfstime_now_str();
|
||||
ni.erase("verf");
|
||||
return ni;
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
|
||||
static json11::Json new_shared_ientry(nfs_kv_write_state *st)
|
||||
static std::string new_shared_ientry(nfs_kv_write_state *st)
|
||||
{
|
||||
auto ni = st->ientry.object_items();
|
||||
ni.erase("empty");
|
||||
|
@ -589,10 +589,10 @@ static json11::Json new_shared_ientry(nfs_kv_write_state *st)
|
|||
ni["ctime"] = ni["mtime"] = nfstime_now_str();
|
||||
ni["shared_ver"] = ni["shared_ver"].uint64_value()+1;
|
||||
ni.erase("verf");
|
||||
return ni;
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
|
||||
static json11::Json new_unshared_ientry(nfs_kv_write_state *st)
|
||||
static std::string new_unshared_ientry(nfs_kv_write_state *st)
|
||||
{
|
||||
auto ni = st->ientry.object_items();
|
||||
ni.erase("empty");
|
||||
|
@ -602,7 +602,7 @@ static json11::Json new_unshared_ientry(nfs_kv_write_state *st)
|
|||
ni.erase("shared_ver");
|
||||
ni["ctime"] = ni["mtime"] = nfstime_now_str();
|
||||
ni.erase("verf");
|
||||
return ni;
|
||||
return json11::Json(ni).dump();
|
||||
}
|
||||
|
||||
static void nfs_kv_extend_inode(nfs_kv_write_state *st, int state, int base_state)
|
||||
|
@ -612,7 +612,7 @@ static void nfs_kv_extend_inode(nfs_kv_write_state *st, int state, int base_stat
|
|||
st->ext->cur_extend = st->ext->next_extend;
|
||||
st->ext->next_extend = 0;
|
||||
st->res2 = -EAGAIN;
|
||||
st->proxy->kvfs->write_inode(st->ino, new_normal_ientry(st), true, [st, base_state](int res)
|
||||
st->proxy->db->set(kv_inode_key(st->ino), new_normal_ientry(st), [st, base_state](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_write(st, base_state+1);
|
||||
|
@ -838,7 +838,7 @@ resume_4:
|
|||
cb(st->res);
|
||||
return;
|
||||
}
|
||||
st->proxy->kvfs->write_inode(st->ino, new_moved_ientry(st), true, [st](int res)
|
||||
st->proxy->db->set(kv_inode_key(st->ino), new_moved_ientry(st), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_write(st, 5);
|
||||
|
@ -881,7 +881,7 @@ resume_7:
|
|||
}
|
||||
resume_8:
|
||||
// We always have to change inode entry on shared writes
|
||||
st->proxy->kvfs->write_inode(st->ino, new_shared_ientry(st), true, [st](int res)
|
||||
st->proxy->db->set(kv_inode_key(st->ino), new_shared_ientry(st), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_write(st, 9);
|
||||
|
@ -930,7 +930,7 @@ resume_11:
|
|||
return;
|
||||
}
|
||||
}
|
||||
st->proxy->kvfs->write_inode(st->ino, new_unshared_ientry(st), true, [st](int res)
|
||||
st->proxy->db->set(kv_inode_key(st->ino), new_unshared_ientry(st), [st](int res)
|
||||
{
|
||||
st->res = res;
|
||||
nfs_kv_continue_write(st, 12);
|
||||
|
@ -953,7 +953,7 @@ resume_12:
|
|||
}
|
||||
// Record removed part of the shared inode as obsolete in statistics
|
||||
st->proxy->kvfs->volume_removed[st->ientry["shared_ino"].uint64_value()] += st->ientry["shared_alloc"].uint64_value();
|
||||
st->ientry_text = new_unshared_ientry(st).dump();
|
||||
st->ientry_text = new_unshared_ientry(st);
|
||||
}
|
||||
// Non-shared write
|
||||
nfs_do_align_write(st, st->ino, st->offset, 0, 13);
|
||||
|
|
|
@ -34,9 +34,6 @@ const char *exe_name = NULL;
|
|||
|
||||
nfs_proxy_t::~nfs_proxy_t()
|
||||
{
|
||||
#ifdef WITH_RDMACM
|
||||
destroy_rdma();
|
||||
#endif
|
||||
if (kvfs)
|
||||
delete kvfs;
|
||||
if (blockfs)
|
||||
|
@ -68,16 +65,9 @@ static const char* help_text =
|
|||
"\n"
|
||||
"vitastor-nfs (--fs <NAME> | --block) start\n"
|
||||
" Start network NFS server. Options:\n"
|
||||
" --bind <IP> bind service to <IP> address (default 0.0.0.0)\n"
|
||||
" --port <PORT> use port <PORT> for NFS services (default is 2049)\n"
|
||||
" specify \"auto\" to auto-select and print port\n"
|
||||
" --portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)\n"
|
||||
" --nfs_rdma <PORT> enable NFS-RDMA at RDMA-CM port <PORT> (you can try 20049)\n"
|
||||
" if RDMA is enabled and --port is set to 0, TCP will be disabled\n"
|
||||
" --nfs_rdma_credit 16 maximum operation credit for RDMA clients (max iodepth)\n"
|
||||
" --nfs_rdma_send 1024 maximum RDMA send operation count (should be larger than iodepth)\n"
|
||||
" --nfs_rdma_alloc 1M RDMA memory allocation rounding\n"
|
||||
" --nfs_rdma_gc 64M maximum unused RDMA buffers\n"
|
||||
" --bind <IP> bind service to <IP> address (default 0.0.0.0)\n"
|
||||
" --port <PORT> use port <PORT> for NFS services (default is 2049)\n"
|
||||
" --portmap 0 do not listen on port 111 (portmap/rpcbind, requires root)\n"
|
||||
"\n"
|
||||
"vitastor-nfs --fs <NAME> upgrade\n"
|
||||
" Upgrade FS metadata. Can be run online, but server(s) should be restarted\n"
|
||||
|
@ -194,7 +184,6 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
srand48(tv.tv_sec*1000000000 + tv.tv_nsec);
|
||||
server_id = (uint64_t)lrand48() | ((uint64_t)lrand48() << 31) | ((uint64_t)lrand48() << 62);
|
||||
// Parse options
|
||||
mountpoint = cfg["mount"].string_value();
|
||||
if (cfg["logfile"].string_value() != "")
|
||||
logfile = cfg["logfile"].string_value();
|
||||
pidfile = cfg["pidfile"].string_value();
|
||||
|
@ -205,24 +194,8 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
default_pool = cfg["pool"].as_string();
|
||||
portmap_enabled = !json_is_false(cfg["portmap"]);
|
||||
nfs_port = cfg["port"].uint64_value() & 0xffff;
|
||||
nfs_rdma_port = cfg["nfs_rdma"].uint64_value() & 0xffff;
|
||||
// Allow RDMA-only mode if port is explicitly set to 0
|
||||
// Allow port auto-selection in server mode if explicitly set to --port auto
|
||||
nfs_port_auto = cfg["port"] == "auto";
|
||||
if (!nfs_port)
|
||||
nfs_port = nfs_port_auto ? 0 : (!cfg["port"].is_null() && nfs_rdma_port ? -1 : 2049);
|
||||
nfs_rdma_credit = cfg["nfs_rdma_credit"].uint64_value();
|
||||
if (!nfs_rdma_credit)
|
||||
nfs_rdma_credit = 16;
|
||||
nfs_rdma_max_send = cfg["nfs_rdma_send"].uint64_value();
|
||||
if (!nfs_rdma_max_send)
|
||||
nfs_rdma_max_send = 1024;
|
||||
nfs_rdma_alloc = cfg["nfs_rdma_alloc"].uint64_value();
|
||||
if (!nfs_rdma_alloc)
|
||||
nfs_rdma_alloc = 1048576;
|
||||
nfs_rdma_gc = cfg["nfs_rdma_gc"].uint64_value();
|
||||
if (!nfs_rdma_gc)
|
||||
nfs_rdma_gc = 64*1048576;
|
||||
nfs_port = 2049;
|
||||
export_root = cfg["nfspath"].string_value();
|
||||
if (!export_root.size())
|
||||
export_root = "/";
|
||||
|
@ -234,6 +207,7 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
obj["client_writeback_allowed"] = true;
|
||||
cfg = obj;
|
||||
}
|
||||
mountpoint = cfg["mount"].string_value();
|
||||
if (mountpoint != "")
|
||||
{
|
||||
bind_address = "127.0.0.1";
|
||||
|
@ -318,60 +292,49 @@ void nfs_proxy_t::run(json11::Json cfg)
|
|||
|
||||
void nfs_proxy_t::run_server(json11::Json cfg)
|
||||
{
|
||||
if (nfs_port != -1)
|
||||
{
|
||||
// Create NFS socket and add it to epoll
|
||||
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
|
||||
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
|
||||
{
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
fprintf(stderr, "Listening portmap socket disconnected, exiting\n");
|
||||
exit(1);
|
||||
}
|
||||
else
|
||||
{
|
||||
do_accept(nfs_socket);
|
||||
}
|
||||
});
|
||||
if (nfs_port_auto)
|
||||
{
|
||||
printf("Port: %d\n", listening_port);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
listening_port = nfs_rdma_port;
|
||||
}
|
||||
// Self-register portmap and NFS
|
||||
pmap.reg_ports.insert((portmap_id_t){
|
||||
.prog = PMAP_PROGRAM,
|
||||
.vers = PMAP_V2,
|
||||
.port = (unsigned)(portmap_enabled ? 111 : listening_port),
|
||||
.port = portmap_enabled ? 111 : nfs_port,
|
||||
.owner = "portmapper-service",
|
||||
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(listening_port)),
|
||||
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(nfs_port)),
|
||||
});
|
||||
pmap.reg_ports.insert((portmap_id_t){
|
||||
.prog = PMAP_PROGRAM,
|
||||
.vers = PMAP_V3,
|
||||
.port = (unsigned)(portmap_enabled ? 111 : listening_port),
|
||||
.port = portmap_enabled ? 111 : nfs_port,
|
||||
.owner = "portmapper-service",
|
||||
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(listening_port)),
|
||||
.addr = portmap_enabled ? "0.0.0.0.0.111" : ("0.0.0.0.0."+std::to_string(nfs_port)),
|
||||
});
|
||||
pmap.reg_ports.insert((portmap_id_t){
|
||||
.prog = NFS_PROGRAM,
|
||||
.vers = NFS_V3,
|
||||
.port = (unsigned)listening_port,
|
||||
.port = nfs_port,
|
||||
.owner = "nfs-server",
|
||||
.addr = "0.0.0.0.0."+std::to_string(listening_port),
|
||||
.addr = "0.0.0.0.0."+std::to_string(nfs_port),
|
||||
});
|
||||
pmap.reg_ports.insert((portmap_id_t){
|
||||
.prog = MOUNT_PROGRAM,
|
||||
.vers = MOUNT_V3,
|
||||
.port = (unsigned)listening_port,
|
||||
.port = nfs_port,
|
||||
.owner = "rpc.mountd",
|
||||
.addr = "0.0.0.0.0."+std::to_string(listening_port),
|
||||
.addr = "0.0.0.0.0."+std::to_string(nfs_port),
|
||||
});
|
||||
// Create NFS socket and add it to epoll
|
||||
int nfs_socket = create_and_bind_socket(bind_address, nfs_port, 128, &listening_port);
|
||||
fcntl(nfs_socket, F_SETFL, fcntl(nfs_socket, F_GETFL, 0) | O_NONBLOCK);
|
||||
epmgr->tfd->set_fd_handler(nfs_socket, false, [this](int nfs_socket, int epoll_events)
|
||||
{
|
||||
if (epoll_events & EPOLLRDHUP)
|
||||
{
|
||||
fprintf(stderr, "Listening portmap socket disconnected, exiting\n");
|
||||
exit(1);
|
||||
}
|
||||
else
|
||||
{
|
||||
do_accept(nfs_socket);
|
||||
}
|
||||
});
|
||||
if (portmap_enabled)
|
||||
{
|
||||
|
@ -391,10 +354,6 @@ void nfs_proxy_t::run_server(json11::Json cfg)
|
|||
}
|
||||
});
|
||||
}
|
||||
if (nfs_rdma_port)
|
||||
{
|
||||
rdma_context = create_rdma(bind_address, nfs_rdma_port, nfs_rdma_credit, nfs_rdma_max_send, nfs_rdma_alloc, nfs_rdma_gc);
|
||||
}
|
||||
if (mountpoint != "")
|
||||
{
|
||||
mount_fs();
|
||||
|
@ -540,20 +499,6 @@ void nfs_proxy_t::check_default_pool()
|
|||
}
|
||||
}
|
||||
|
||||
nfs_client_t *nfs_proxy_t::create_client()
|
||||
{
|
||||
auto cli = new nfs_client_t();
|
||||
cli->parent = this;
|
||||
if (kvfs)
|
||||
nfs_kv_procs(cli);
|
||||
else
|
||||
nfs_block_procs(cli);
|
||||
for (auto & fn: pmap.proc_table)
|
||||
cli->proc_table.insert(fn);
|
||||
rpc_clients.insert(cli);
|
||||
return cli;
|
||||
}
|
||||
|
||||
void nfs_proxy_t::do_accept(int listen_fd)
|
||||
{
|
||||
struct sockaddr_storage addr;
|
||||
|
@ -567,8 +512,18 @@ void nfs_proxy_t::do_accept(int listen_fd)
|
|||
fcntl(nfs_fd, F_SETFL, fcntl(nfs_fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
int one = 1;
|
||||
setsockopt(nfs_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
|
||||
auto cli = this->create_client();
|
||||
auto cli = new nfs_client_t();
|
||||
if (kvfs)
|
||||
nfs_kv_procs(cli);
|
||||
else
|
||||
nfs_block_procs(cli);
|
||||
cli->parent = this;
|
||||
cli->nfs_fd = nfs_fd;
|
||||
for (auto & fn: pmap.proc_table)
|
||||
{
|
||||
cli->proc_table.insert(fn);
|
||||
}
|
||||
rpc_clients[nfs_fd] = cli;
|
||||
epmgr->tfd->set_fd_handler(nfs_fd, true, [cli](int nfs_fd, int epoll_events)
|
||||
{
|
||||
// Handle incoming event
|
||||
|
@ -825,17 +780,11 @@ void nfs_client_t::stop()
|
|||
stopped = true;
|
||||
if (refs <= 0)
|
||||
{
|
||||
#ifdef WITH_RDMACM
|
||||
destroy_rdma_conn();
|
||||
#endif
|
||||
auto parent = this->parent;
|
||||
parent->rpc_clients.erase(this);
|
||||
parent->rpc_clients.erase(nfs_fd);
|
||||
parent->active_connections--;
|
||||
if (nfs_fd >= 0)
|
||||
{
|
||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
|
||||
close(nfs_fd);
|
||||
}
|
||||
parent->epmgr->tfd->set_fd_handler(nfs_fd, true, NULL);
|
||||
close(nfs_fd);
|
||||
delete this;
|
||||
parent->check_exit();
|
||||
}
|
||||
|
@ -864,7 +813,8 @@ void nfs_client_t::handle_send(int result)
|
|||
if (rop)
|
||||
{
|
||||
// Reply fully sent
|
||||
parent->free_xdr(rop->xdrs);
|
||||
xdr_reset(rop->xdrs);
|
||||
parent->xdr_pool.push_back(rop->xdrs);
|
||||
if (rop->buffer && rop->referenced)
|
||||
{
|
||||
// Dereference the buffer
|
||||
|
@ -881,7 +831,7 @@ void nfs_client_t::handle_send(int result)
|
|||
{
|
||||
// FIXME Maybe put free_buffers into parent
|
||||
free_buffers.push_back((rpc_free_buffer_t){
|
||||
.buf = (uint8_t*)rop->buffer,
|
||||
.buf = rop->buffer,
|
||||
.size = ub.size,
|
||||
});
|
||||
used_buffers.erase(rop->buffer);
|
||||
|
@ -926,6 +876,8 @@ void nfs_client_t::handle_send(int result)
|
|||
void rpc_queue_reply(rpc_op_t *rop)
|
||||
{
|
||||
nfs_client_t *self = (nfs_client_t*)rop->client;
|
||||
iovec *iov_list = NULL;
|
||||
unsigned iov_count = 0;
|
||||
int r = xdr_encode(rop->xdrs, (xdrproc_t)xdr_rpc_msg, &rop->out_msg);
|
||||
assert(r);
|
||||
if (rop->reply_fn != NULL)
|
||||
|
@ -933,78 +885,55 @@ void rpc_queue_reply(rpc_op_t *rop)
|
|||
r = xdr_encode(rop->xdrs, rop->reply_fn, rop->reply);
|
||||
assert(r);
|
||||
}
|
||||
#ifdef WITH_RDMACM
|
||||
if (!self->rdma_conn)
|
||||
#endif
|
||||
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
|
||||
assert(iov_count > 0);
|
||||
rop->reply_marker = 0;
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
{
|
||||
iovec *iov_list = NULL;
|
||||
unsigned iov_count = 0;
|
||||
xdr_encode_finish(rop->xdrs, &iov_list, &iov_count);
|
||||
assert(iov_count > 0);
|
||||
rop->reply_marker = 0;
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
{
|
||||
rop->reply_marker += iov_list[i].iov_len;
|
||||
}
|
||||
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
|
||||
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
|
||||
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
|
||||
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
|
||||
rop->reply_marker += iov_list[i].iov_len;
|
||||
}
|
||||
rop->reply_marker = htobe32(rop->reply_marker | 0x80000000);
|
||||
auto & to_send_list = self->write_msg.msg_iovlen ? self->next_send_list : self->send_list;
|
||||
auto & to_outbox = self->write_msg.msg_iovlen ? self->next_outbox : self->outbox;
|
||||
to_send_list.push_back((iovec){ .iov_base = &rop->reply_marker, .iov_len = 4 });
|
||||
to_outbox.push_back(NULL);
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
{
|
||||
to_send_list.push_back(iov_list[i]);
|
||||
to_outbox.push_back(NULL);
|
||||
for (unsigned i = 0; i < iov_count; i++)
|
||||
{
|
||||
to_send_list.push_back(iov_list[i]);
|
||||
to_outbox.push_back(NULL);
|
||||
}
|
||||
to_outbox[to_outbox.size()-1] = rop;
|
||||
self->submit_send();
|
||||
}
|
||||
#ifdef WITH_RDMACM
|
||||
else
|
||||
{
|
||||
self->rdma_queue_reply(rop);
|
||||
}
|
||||
#endif
|
||||
to_outbox[to_outbox.size()-1] = rop;
|
||||
self->submit_send();
|
||||
}
|
||||
|
||||
XDR *nfs_proxy_t::get_xdr()
|
||||
int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len)
|
||||
{
|
||||
// Take an XDR object from the pool
|
||||
XDR *xdrs;
|
||||
if (xdr_pool.size())
|
||||
if (parent->xdr_pool.size())
|
||||
{
|
||||
xdrs = xdr_pool.back();
|
||||
xdr_pool.pop_back();
|
||||
xdrs = parent->xdr_pool.back();
|
||||
parent->xdr_pool.pop_back();
|
||||
}
|
||||
else
|
||||
{
|
||||
xdrs = xdr_create();
|
||||
}
|
||||
return xdrs;
|
||||
}
|
||||
|
||||
void nfs_proxy_t::free_xdr(XDR *xdrs)
|
||||
{
|
||||
xdr_reset(xdrs);
|
||||
xdr_pool.push_back(xdrs);
|
||||
}
|
||||
|
||||
int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len)
|
||||
{
|
||||
XDR *xdrs = parent->get_xdr();
|
||||
// Decode the RPC header
|
||||
char inmsg_data[sizeof(rpc_msg)];
|
||||
rpc_msg *inmsg = (rpc_msg*)&inmsg_data;
|
||||
if (!xdr_decode(xdrs, msg_buf, msg_len, (xdrproc_t)xdr_rpc_msg, inmsg))
|
||||
{
|
||||
// Invalid message, ignore it
|
||||
parent->free_xdr(xdrs);
|
||||
xdr_reset(xdrs);
|
||||
parent->xdr_pool.push_back(xdrs);
|
||||
return 0;
|
||||
}
|
||||
if (inmsg->body.dir != RPC_CALL)
|
||||
{
|
||||
// Reply sent to the server? Strange thing. Also ignore it
|
||||
parent->free_xdr(xdrs);
|
||||
xdr_reset(xdrs);
|
||||
parent->xdr_pool.push_back(xdrs);
|
||||
return 0;
|
||||
}
|
||||
if (inmsg->body.cbody.rpcvers != RPC_MSG_VERSION)
|
||||
|
@ -1039,17 +968,6 @@ int nfs_client_t::handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg
|
|||
// Incoming buffer isn't needed to handle request, so return 0
|
||||
return 0;
|
||||
}
|
||||
auto rop = create_rpc_op(xdrs, base_buf, inmsg, NULL);
|
||||
if (!rop)
|
||||
{
|
||||
// No such procedure
|
||||
return 0;
|
||||
}
|
||||
return handle_rpc_op(rop);
|
||||
}
|
||||
|
||||
rpc_op_t *nfs_client_t::create_rpc_op(XDR *xdrs, void *buffer, rpc_msg *inmsg, rdma_msg *rmsg)
|
||||
{
|
||||
// Find decoder for the request
|
||||
auto proc_it = proc_table.find((rpc_service_proc_t){
|
||||
.prog = inmsg->body.cbody.prog,
|
||||
|
@ -1077,7 +995,6 @@ rpc_op_t *nfs_client_t::create_rpc_op(XDR *xdrs, void *buffer, rpc_msg *inmsg, r
|
|||
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(sizeof(rpc_op_t));
|
||||
*rop = (rpc_op_t){
|
||||
.client = this,
|
||||
.buffer = buffer,
|
||||
.xdrs = xdrs,
|
||||
.out_msg = (rpc_msg){
|
||||
.xid = inmsg->xid,
|
||||
|
@ -1100,15 +1017,9 @@ rpc_op_t *nfs_client_t::create_rpc_op(XDR *xdrs, void *buffer, rpc_msg *inmsg, r
|
|||
},
|
||||
},
|
||||
};
|
||||
// FIXME: malloc and avoid copy?
|
||||
memcpy(&rop->in_msg, inmsg, sizeof(rpc_msg));
|
||||
if (rmsg)
|
||||
{
|
||||
memcpy(&rop->in_rdma_msg, rmsg, sizeof(rdma_msg));
|
||||
}
|
||||
rpc_queue_reply(rop);
|
||||
// Incoming buffer isn't needed to handle request, so return 0
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
// Allocate memory
|
||||
rpc_op_t *rop = (rpc_op_t*)malloc_or_die(
|
||||
|
@ -1117,7 +1028,7 @@ rpc_op_t *nfs_client_t::create_rpc_op(XDR *xdrs, void *buffer, rpc_msg *inmsg, r
|
|||
rpc_reply_stat x = RPC_MSG_ACCEPTED;
|
||||
*rop = (rpc_op_t){
|
||||
.client = this,
|
||||
.buffer = buffer,
|
||||
.buffer = (uint8_t*)base_buf,
|
||||
.xdrs = xdrs,
|
||||
.out_msg = (rpc_msg){
|
||||
.xid = inmsg->xid,
|
||||
|
@ -1134,25 +1045,10 @@ rpc_op_t *nfs_client_t::create_rpc_op(XDR *xdrs, void *buffer, rpc_msg *inmsg, r
|
|||
.request = ((uint8_t*)rop) + sizeof(rpc_op_t),
|
||||
.reply = ((uint8_t*)rop) + sizeof(rpc_op_t) + proc_it->req_size,
|
||||
};
|
||||
// FIXME: malloc and avoid copy?
|
||||
memcpy(&rop->in_msg, inmsg, sizeof(rpc_msg));
|
||||
if (rmsg)
|
||||
{
|
||||
memcpy(&rop->in_rdma_msg, rmsg, sizeof(rdma_msg));
|
||||
}
|
||||
return rop;
|
||||
}
|
||||
|
||||
int nfs_client_t::handle_rpc_op(rpc_op_t *rop)
|
||||
{
|
||||
// Try to decode the request
|
||||
// req_fn may be NULL, that means function has no arguments
|
||||
auto proc_it = proc_table.find((rpc_service_proc_t){
|
||||
.prog = rop->in_msg.body.cbody.prog,
|
||||
.vers = rop->in_msg.body.cbody.vers,
|
||||
.proc = rop->in_msg.body.cbody.proc,
|
||||
});
|
||||
if (proc_it == proc_table.end() || proc_it->req_fn && !proc_it->req_fn(rop->xdrs, rop->request))
|
||||
if (proc_it->req_fn && !proc_it->req_fn(xdrs, rop->request))
|
||||
{
|
||||
// Invalid request
|
||||
rop->out_msg.body.rbody.areply.reply_data.stat = RPC_GARBAGE_ARGS;
|
||||
|
@ -1162,55 +1058,18 @@ int nfs_client_t::handle_rpc_op(rpc_op_t *rop)
|
|||
}
|
||||
rop->out_msg.body.rbody.areply.reply_data.stat = RPC_SUCCESS;
|
||||
rop->reply_fn = proc_it->resp_fn;
|
||||
rop->referenced = 0;
|
||||
int ref = proc_it->handler_fn(proc_it->opaque, rop);
|
||||
if (ref)
|
||||
rop->referenced = 1;
|
||||
rop->referenced = ref ? 1 : 0;
|
||||
return ref;
|
||||
}
|
||||
|
||||
void *nfs_client_t::malloc_or_rdma(rpc_op_t *rop, size_t size)
|
||||
{
|
||||
#ifdef WITH_RDMACM
|
||||
if (!rdma_conn)
|
||||
{
|
||||
#endif
|
||||
void *buf = malloc_or_die(size);
|
||||
xdr_add_malloc(rop->xdrs, buf);
|
||||
return buf;
|
||||
#ifdef WITH_RDMACM
|
||||
}
|
||||
void *buf = rdma_malloc(size);
|
||||
xdr_set_rdma_chunk(rop->xdrs, buf);
|
||||
return buf;
|
||||
#endif
|
||||
}
|
||||
|
||||
void nfs_client_t::free_or_rdma(rpc_op_t *rop, void *buf)
|
||||
{
|
||||
#ifdef WITH_RDMACM
|
||||
if (!rdma_conn)
|
||||
{
|
||||
#endif
|
||||
xdr_del_malloc(rop->xdrs, buf);
|
||||
free(buf);
|
||||
#ifdef WITH_RDMACM
|
||||
}
|
||||
else
|
||||
{
|
||||
xdr_set_rdma_chunk(rop->xdrs, NULL);
|
||||
rdma_free(buf);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void nfs_proxy_t::daemonize()
|
||||
{
|
||||
// Stop all clients because client I/O sometimes breaks during daemonize
|
||||
// I.e. the new process stops receiving events on the old FD
|
||||
// It doesn't happen if we call sleep(1) here, but we don't want to call sleep(1)...
|
||||
for (auto & cli: rpc_clients)
|
||||
cli->stop();
|
||||
for (auto & clp: rpc_clients)
|
||||
clp.second->stop();
|
||||
if (fork())
|
||||
exit(0);
|
||||
setsid();
|
||||
|
|
|
@ -22,7 +22,6 @@ class cli_tool_t;
|
|||
struct kv_fs_state_t;
|
||||
struct block_fs_state_t;
|
||||
class nfs_client_t;
|
||||
struct nfs_rdma_context_t;
|
||||
|
||||
class nfs_proxy_t
|
||||
{
|
||||
|
@ -34,13 +33,7 @@ public:
|
|||
std::string default_pool;
|
||||
std::string export_root;
|
||||
bool portmap_enabled;
|
||||
bool nfs_port_auto = false;
|
||||
unsigned nfs_port = 0;
|
||||
unsigned nfs_rdma_port = 0;
|
||||
uint32_t nfs_rdma_credit = 16;
|
||||
uint32_t nfs_rdma_max_send = 1024;
|
||||
uint64_t nfs_rdma_alloc = 1048576;
|
||||
uint64_t nfs_rdma_gc = 500*1048576;
|
||||
unsigned nfs_port;
|
||||
int trace = 0;
|
||||
std::string logfile = "/dev/null";
|
||||
std::string pidfile;
|
||||
|
@ -62,8 +55,7 @@ public:
|
|||
vitastorkv_dbw_t *db = NULL;
|
||||
kv_fs_state_t *kvfs = NULL;
|
||||
block_fs_state_t *blockfs = NULL;
|
||||
nfs_rdma_context_t* rdma_context = NULL;
|
||||
std::set<nfs_client_t*> rpc_clients;
|
||||
std::map<int, nfs_client_t*> rpc_clients;
|
||||
|
||||
std::vector<XDR*> xdr_pool;
|
||||
|
||||
|
@ -80,20 +72,12 @@ public:
|
|||
void watch_stats();
|
||||
void parse_stats(etcd_kv_t & kv);
|
||||
void check_default_pool();
|
||||
nfs_client_t* create_client();
|
||||
void do_accept(int listen_fd);
|
||||
void daemonize();
|
||||
void write_pid();
|
||||
void mount_fs();
|
||||
void check_already_mounted();
|
||||
void check_exit();
|
||||
|
||||
nfs_rdma_context_t* create_rdma(const std::string & bind_address, int rdmacm_port,
|
||||
uint32_t max_iodepth, uint32_t max_send_wr, uint64_t rdma_malloc_round_to, uint64_t rdma_max_unused_buffers);
|
||||
void destroy_rdma();
|
||||
|
||||
XDR *get_xdr();
|
||||
void free_xdr(XDR *xdrs);
|
||||
};
|
||||
|
||||
struct rpc_cur_buffer_t
|
||||
|
@ -117,24 +101,19 @@ struct rpc_free_buffer_t
|
|||
unsigned size;
|
||||
};
|
||||
|
||||
struct nfs_rdma_conn_t;
|
||||
|
||||
class nfs_client_t
|
||||
{
|
||||
public:
|
||||
nfs_proxy_t *parent = NULL;
|
||||
int nfs_fd;
|
||||
int epoll_events = 0;
|
||||
int refs = 0;
|
||||
bool stopped = false;
|
||||
std::set<rpc_service_proc_t> proc_table;
|
||||
nfs_rdma_conn_t *rdma_conn = NULL;
|
||||
|
||||
// <TCP>
|
||||
int nfs_fd = -1;
|
||||
int epoll_events = 0;
|
||||
|
||||
// Read state
|
||||
rpc_cur_buffer_t cur_buffer = { 0 };
|
||||
std::map<void*, rpc_used_buffer_t> used_buffers;
|
||||
std::map<uint8_t*, rpc_used_buffer_t> used_buffers;
|
||||
std::vector<rpc_free_buffer_t> free_buffers;
|
||||
|
||||
iovec read_iov;
|
||||
|
@ -151,16 +130,7 @@ public:
|
|||
void submit_send();
|
||||
void handle_send(int result);
|
||||
int handle_rpc_message(void *base_buf, void *msg_buf, uint32_t msg_len);
|
||||
// </TCP>
|
||||
|
||||
rpc_op_t *create_rpc_op(XDR *xdrs, void *buffer, rpc_msg *inmsg, rdma_msg *rmsg);
|
||||
int handle_rpc_op(rpc_op_t *rop);
|
||||
bool deref();
|
||||
void stop();
|
||||
void *malloc_or_rdma(rpc_op_t *rop, size_t size);
|
||||
void free_or_rdma(rpc_op_t *rop, void *buf);
|
||||
void *rdma_malloc(size_t size);
|
||||
void rdma_free(void *buf);
|
||||
void rdma_queue_reply(rpc_op_t *rop);
|
||||
void destroy_rdma_conn();
|
||||
};
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -168,7 +168,7 @@ struct WRITE3args {
|
|||
offset3 offset;
|
||||
count3 count;
|
||||
stable_how stable;
|
||||
opaque data<>; /* RDMA DDP-eligible */
|
||||
opaque data<>;
|
||||
};
|
||||
|
||||
typedef opaque writeverf3[NFS3_WRITEVERFSIZE];
|
||||
|
@ -409,7 +409,7 @@ struct READ3resok {
|
|||
post_op_attr file_attributes;
|
||||
count3 count;
|
||||
bool eof;
|
||||
opaque data<>; /* RDMA DDP-eligible */
|
||||
opaque data<>;
|
||||
};
|
||||
|
||||
struct READ3resfail {
|
||||
|
@ -514,7 +514,7 @@ typedef string nfspath3<>;
|
|||
|
||||
struct symlinkdata3 {
|
||||
sattr3 symlink_attributes;
|
||||
nfspath3 symlink_data; /* RDMA DDP-eligible */
|
||||
nfspath3 symlink_data;
|
||||
};
|
||||
|
||||
struct SYMLINK3args {
|
||||
|
@ -546,7 +546,7 @@ struct READLINK3args {
|
|||
|
||||
struct READLINK3resok {
|
||||
post_op_attr symlink_attributes;
|
||||
nfspath3 data; /* RDMA DDP-eligible */
|
||||
nfspath3 data;
|
||||
};
|
||||
|
||||
struct READLINK3resfail {
|
||||
|
|
|
@ -272,7 +272,7 @@ xdr_WRITE3args (XDR *xdrs, WRITE3args *objp)
|
|||
return FALSE;
|
||||
if (!xdr_stable_how (xdrs, &objp->stable))
|
||||
return FALSE;
|
||||
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
|
||||
if (!xdr_bytes(xdrs, &objp->data, ~0))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
@ -829,7 +829,7 @@ xdr_READ3resok (XDR *xdrs, READ3resok *objp)
|
|||
return FALSE;
|
||||
if (!xdr_bool (xdrs, &objp->eof))
|
||||
return FALSE;
|
||||
if (!xdr_bytes(xdrs, &objp->data, ~0, true))
|
||||
if (!xdr_bytes(xdrs, &objp->data, ~0))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
@ -1173,10 +1173,10 @@ xdr_PATHCONF3res (XDR *xdrs, PATHCONF3res *objp)
|
|||
}
|
||||
|
||||
bool_t
|
||||
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp, bool rdma_chunk)
|
||||
xdr_nfspath3 (XDR *xdrs, nfspath3 *objp)
|
||||
{
|
||||
|
||||
if (!xdr_string (xdrs, objp, ~0, rdma_chunk))
|
||||
if (!xdr_string (xdrs, objp, ~0))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
@ -1187,7 +1187,7 @@ xdr_symlinkdata3 (XDR *xdrs, symlinkdata3 *objp)
|
|||
|
||||
if (!xdr_sattr3 (xdrs, &objp->symlink_attributes))
|
||||
return FALSE;
|
||||
if (!xdr_nfspath3 (xdrs, &objp->symlink_data, true))
|
||||
if (!xdr_nfspath3 (xdrs, &objp->symlink_data))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
@ -1259,7 +1259,7 @@ xdr_READLINK3resok (XDR *xdrs, READLINK3resok *objp)
|
|||
|
||||
if (!xdr_post_op_attr (xdrs, &objp->symlink_attributes))
|
||||
return FALSE;
|
||||
if (!xdr_nfspath3 (xdrs, &objp->data, true))
|
||||
if (!xdr_nfspath3 (xdrs, &objp->data))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
diff --git a/src/nfs/proto/nfs_xdr.cpp b/src/nfs/proto/nfs_xdr.cpp
|
||||
index 87451293..5897e6ad 100644
|
||||
--- a/src/nfs/proto/nfs_xdr.cpp
|
||||
+++ b/src/nfs/proto/nfs_xdr.cpp
|
||||
@@ -272,7 +272,7 @@ xdr_WRITE3args (XDR *xdrs, WRITE3args *objp)
|
||||
return FALSE;
|
||||
if (!xdr_stable_how (xdrs, &objp->stable))
|
||||
return FALSE;
|
||||
- if (!xdr_bytes(xdrs, &objp->data, ~0))
|
||||
+ if (!xdr_bytes(xdrs, &objp->data, ~0, true))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
@@ -829,7 +829,7 @@ xdr_READ3resok (XDR *xdrs, READ3resok *objp)
|
||||
return FALSE;
|
||||
if (!xdr_bool (xdrs, &objp->eof))
|
||||
return FALSE;
|
||||
- if (!xdr_bytes(xdrs, &objp->data, ~0))
|
||||
+ if (!xdr_bytes(xdrs, &objp->data, ~0, true))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
@@ -1173,10 +1173,10 @@ xdr_PATHCONF3res (XDR *xdrs, PATHCONF3res *objp)
|
||||
}
|
||||
|
||||
bool_t
|
||||
-xdr_nfspath3 (XDR *xdrs, nfspath3 *objp)
|
||||
+xdr_nfspath3 (XDR *xdrs, nfspath3 *objp, bool rdma_chunk)
|
||||
{
|
||||
|
||||
- if (!xdr_string (xdrs, objp, ~0))
|
||||
+ if (!xdr_string (xdrs, objp, ~0, rdma_chunk))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
@@ -1187,7 +1187,7 @@ xdr_symlinkdata3 (XDR *xdrs, symlinkdata3 *objp)
|
||||
|
||||
if (!xdr_sattr3 (xdrs, &objp->symlink_attributes))
|
||||
return FALSE;
|
||||
- if (!xdr_nfspath3 (xdrs, &objp->symlink_data))
|
||||
+ if (!xdr_nfspath3 (xdrs, &objp->symlink_data, true))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
@@ -1259,7 +1259,7 @@ xdr_READLINK3resok (XDR *xdrs, READLINK3resok *objp)
|
||||
|
||||
if (!xdr_post_op_attr (xdrs, &objp->symlink_attributes))
|
||||
return FALSE;
|
||||
- if (!xdr_nfspath3 (xdrs, &objp->data))
|
||||
+ if (!xdr_nfspath3 (xdrs, &objp->data, true))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
|
@ -1,7 +1,6 @@
|
|||
#pragma once
|
||||
|
||||
#include "rpc.h"
|
||||
#include "rpc_rdma.h"
|
||||
|
||||
struct rpc_op_t;
|
||||
|
||||
|
@ -28,16 +27,12 @@ inline bool operator < (const rpc_service_proc_t & a, const rpc_service_proc_t &
|
|||
return a.prog < b.prog || a.prog == b.prog && (a.vers < b.vers || a.vers == b.vers && a.proc < b.proc);
|
||||
}
|
||||
|
||||
struct rdma_msg;
|
||||
|
||||
struct rpc_op_t
|
||||
{
|
||||
void *client;
|
||||
void *buffer;
|
||||
uint8_t *buffer;
|
||||
XDR *xdrs;
|
||||
rpc_msg in_msg, out_msg;
|
||||
rdma_msg in_rdma_msg;
|
||||
rpc_rdma_errcode rdma_error;
|
||||
void *request;
|
||||
void *reply;
|
||||
xdrproc_t reply_fn;
|
||||
|
|
|
@ -1,144 +0,0 @@
|
|||
/*
|
||||
* Please do not edit this file.
|
||||
* It was generated using rpcgen.
|
||||
*/
|
||||
|
||||
#ifndef _RPC_RDMA_H_RPCGEN
|
||||
#define _RPC_RDMA_H_RPCGEN
|
||||
|
||||
#include "xdr_impl.h"
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
|
||||
struct xdr_rdma_segment {
|
||||
uint32_t handle;
|
||||
uint32_t length;
|
||||
uint64_t offset;
|
||||
};
|
||||
typedef struct xdr_rdma_segment xdr_rdma_segment;
|
||||
|
||||
struct xdr_read_chunk {
|
||||
uint32_t position;
|
||||
struct xdr_rdma_segment target;
|
||||
};
|
||||
typedef struct xdr_read_chunk xdr_read_chunk;
|
||||
|
||||
struct xdr_read_list {
|
||||
struct xdr_read_chunk entry;
|
||||
struct xdr_read_list *next;
|
||||
};
|
||||
typedef struct xdr_read_list xdr_read_list;
|
||||
|
||||
struct xdr_write_chunk {
|
||||
struct {
|
||||
u_int target_len;
|
||||
struct xdr_rdma_segment *target_val;
|
||||
} target;
|
||||
};
|
||||
typedef struct xdr_write_chunk xdr_write_chunk;
|
||||
|
||||
struct xdr_write_list {
|
||||
struct xdr_write_chunk entry;
|
||||
struct xdr_write_list *next;
|
||||
};
|
||||
typedef struct xdr_write_list xdr_write_list;
|
||||
|
||||
struct rpc_rdma_header {
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
};
|
||||
typedef struct rpc_rdma_header rpc_rdma_header;
|
||||
|
||||
struct rpc_rdma_header_nomsg {
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
};
|
||||
typedef struct rpc_rdma_header_nomsg rpc_rdma_header_nomsg;
|
||||
|
||||
struct rpc_rdma_header_padded {
|
||||
uint32_t rdma_align;
|
||||
uint32_t rdma_thresh;
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
};
|
||||
typedef struct rpc_rdma_header_padded rpc_rdma_header_padded;
|
||||
|
||||
enum rpc_rdma_errcode {
|
||||
ERR_VERS = 1,
|
||||
ERR_CHUNK = 2,
|
||||
};
|
||||
typedef enum rpc_rdma_errcode rpc_rdma_errcode;
|
||||
|
||||
struct rpc_rdma_errvers {
|
||||
uint32_t rdma_vers_low;
|
||||
uint32_t rdma_vers_high;
|
||||
};
|
||||
typedef struct rpc_rdma_errvers rpc_rdma_errvers;
|
||||
|
||||
struct rpc_rdma_error {
|
||||
rpc_rdma_errcode err;
|
||||
union {
|
||||
rpc_rdma_errvers range;
|
||||
};
|
||||
};
|
||||
typedef struct rpc_rdma_error rpc_rdma_error;
|
||||
|
||||
enum rdma_proc {
|
||||
RDMA_MSG = 0,
|
||||
RDMA_NOMSG = 1,
|
||||
RDMA_MSGP = 2,
|
||||
RDMA_DONE = 3,
|
||||
RDMA_ERROR = 4,
|
||||
};
|
||||
typedef enum rdma_proc rdma_proc;
|
||||
|
||||
struct rdma_body {
|
||||
rdma_proc proc;
|
||||
union {
|
||||
rpc_rdma_header rdma_msg;
|
||||
rpc_rdma_header_nomsg rdma_nomsg;
|
||||
rpc_rdma_header_padded rdma_msgp;
|
||||
rpc_rdma_error rdma_error;
|
||||
};
|
||||
};
|
||||
typedef struct rdma_body rdma_body;
|
||||
|
||||
struct rdma_msg {
|
||||
uint32_t rdma_xid;
|
||||
uint32_t rdma_vers;
|
||||
uint32_t rdma_credit;
|
||||
rdma_body rdma_body;
|
||||
};
|
||||
typedef struct rdma_msg rdma_msg;
|
||||
|
||||
/* the xdr functions */
|
||||
|
||||
|
||||
extern bool_t xdr_xdr_rdma_segment (XDR *, xdr_rdma_segment*);
|
||||
extern bool_t xdr_xdr_read_chunk (XDR *, xdr_read_chunk*);
|
||||
extern bool_t xdr_xdr_read_list (XDR *, xdr_read_list*);
|
||||
extern bool_t xdr_xdr_write_chunk (XDR *, xdr_write_chunk*);
|
||||
extern bool_t xdr_xdr_write_list (XDR *, xdr_write_list*);
|
||||
extern bool_t xdr_rpc_rdma_header (XDR *, rpc_rdma_header*);
|
||||
extern bool_t xdr_rpc_rdma_header_nomsg (XDR *, rpc_rdma_header_nomsg*);
|
||||
extern bool_t xdr_rpc_rdma_header_padded (XDR *, rpc_rdma_header_padded*);
|
||||
extern bool_t xdr_rpc_rdma_errcode (XDR *, rpc_rdma_errcode*);
|
||||
extern bool_t xdr_rpc_rdma_errvers (XDR *, rpc_rdma_errvers*);
|
||||
extern bool_t xdr_rpc_rdma_error (XDR *, rpc_rdma_error*);
|
||||
extern bool_t xdr_rdma_proc (XDR *, rdma_proc*);
|
||||
extern bool_t xdr_rdma_body (XDR *, rdma_body*);
|
||||
extern bool_t xdr_rdma_msg (XDR *, rdma_msg*);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* !_RPC_RDMA_H_RPCGEN */
|
|
@ -1,166 +0,0 @@
|
|||
/* RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1 */
|
||||
|
||||
/*
|
||||
* Copyright (c) 2010-2017 IETF Trust and the persons
|
||||
* identified as authors of the code. All rights reserved.
|
||||
*
|
||||
* The authors of the code are:
|
||||
* B. Callaghan, T. Talpey, and C. Lever
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with
|
||||
* or without modification, are permitted provided that the
|
||||
* following conditions are met:
|
||||
*
|
||||
* - Redistributions of source code must retain the above
|
||||
* copyright notice, this list of conditions and the
|
||||
* following disclaimer.
|
||||
*
|
||||
* - Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the
|
||||
* following disclaimer in the documentation and/or other
|
||||
* materials provided with the distribution.
|
||||
*
|
||||
* - Neither the name of Internet Society, IETF or IETF
|
||||
* Trust, nor the names of specific contributors, may be
|
||||
* used to endorse or promote products derived from this
|
||||
* software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS
|
||||
* AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
|
||||
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
||||
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
|
||||
* EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
||||
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
|
||||
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||||
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
|
||||
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
||||
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Plain RDMA segment (Section 3.4.3)
|
||||
*/
|
||||
struct xdr_rdma_segment {
|
||||
uint32_t handle; /* Registered memory handle */
|
||||
uint32_t length; /* Length of the chunk in bytes */
|
||||
uint64_t offset; /* Chunk virtual address or offset */
|
||||
};
|
||||
|
||||
/*
|
||||
* RDMA read segment (Section 3.4.5)
|
||||
*/
|
||||
struct xdr_read_chunk {
|
||||
uint32_t position; /* Position in XDR stream */
|
||||
struct xdr_rdma_segment target;
|
||||
};
|
||||
|
||||
/*
|
||||
* Read list (Section 4.3.1)
|
||||
*/
|
||||
struct xdr_read_list {
|
||||
struct xdr_read_chunk entry;
|
||||
struct xdr_read_list *next;
|
||||
};
|
||||
|
||||
/*
|
||||
* Write chunk (Section 3.4.6)
|
||||
*/
|
||||
struct xdr_write_chunk {
|
||||
struct xdr_rdma_segment target<>;
|
||||
};
|
||||
|
||||
/*
|
||||
* Write list (Section 4.3.2)
|
||||
*/
|
||||
struct xdr_write_list {
|
||||
struct xdr_write_chunk entry;
|
||||
struct xdr_write_list *next;
|
||||
};
|
||||
|
||||
/*
|
||||
* Chunk lists (Section 4.3)
|
||||
*/
|
||||
struct rpc_rdma_header {
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
/* rpc body follows */
|
||||
};
|
||||
|
||||
struct rpc_rdma_header_nomsg {
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
};
|
||||
|
||||
/* Not to be used */
|
||||
struct rpc_rdma_header_padded {
|
||||
uint32_t rdma_align;
|
||||
uint32_t rdma_thresh;
|
||||
struct xdr_read_list *rdma_reads;
|
||||
struct xdr_write_list *rdma_writes;
|
||||
struct xdr_write_chunk *rdma_reply;
|
||||
/* rpc body follows */
|
||||
};
|
||||
|
||||
/*
|
||||
* Error handling (Section 4.5)
|
||||
*/
|
||||
enum rpc_rdma_errcode {
|
||||
ERR_VERS = 1, /* Value fixed for all versions */
|
||||
ERR_CHUNK = 2
|
||||
};
|
||||
|
||||
/* Structure fixed for all versions */
|
||||
struct rpc_rdma_errvers {
|
||||
uint32_t rdma_vers_low;
|
||||
uint32_t rdma_vers_high;
|
||||
};
|
||||
|
||||
union rpc_rdma_error switch (rpc_rdma_errcode err) {
|
||||
case ERR_VERS:
|
||||
rpc_rdma_errvers range;
|
||||
case ERR_CHUNK:
|
||||
void;
|
||||
};
|
||||
|
||||
/*
|
||||
* Procedures (Section 4.2.4)
|
||||
*/
|
||||
enum rdma_proc {
|
||||
RDMA_MSG = 0, /* Value fixed for all versions */
|
||||
RDMA_NOMSG = 1, /* Value fixed for all versions */
|
||||
RDMA_MSGP = 2, /* Not to be used */
|
||||
RDMA_DONE = 3, /* Not to be used */
|
||||
RDMA_ERROR = 4 /* Value fixed for all versions */
|
||||
};
|
||||
|
||||
/* The position of the proc discriminator field is
|
||||
* fixed for all versions */
|
||||
union rdma_body switch (rdma_proc proc) {
|
||||
case RDMA_MSG:
|
||||
rpc_rdma_header rdma_msg;
|
||||
case RDMA_NOMSG:
|
||||
rpc_rdma_header_nomsg rdma_nomsg;
|
||||
case RDMA_MSGP: /* Not to be used */
|
||||
rpc_rdma_header_padded rdma_msgp;
|
||||
case RDMA_DONE: /* Not to be used */
|
||||
void;
|
||||
case RDMA_ERROR:
|
||||
rpc_rdma_error rdma_error;
|
||||
};
|
||||
|
||||
/*
|
||||
* Fixed header fields (Section 4.2)
|
||||
*/
|
||||
struct rdma_msg {
|
||||
uint32_t rdma_xid; /* Position fixed for all versions */
|
||||
uint32_t rdma_vers; /* Position fixed for all versions */
|
||||
uint32_t rdma_credit; /* Position fixed for all versions */
|
||||
rdma_body rdma_body;
|
||||
};
|
|
@ -1,200 +0,0 @@
|
|||
/*
|
||||
* Please do not edit this file.
|
||||
* It was generated using rpcgen.
|
||||
*/
|
||||
|
||||
#include "rpc_rdma.h"
|
||||
#include "xdr_impl_inline.h"
|
||||
|
||||
bool_t
|
||||
xdr_xdr_rdma_segment (XDR *xdrs, xdr_rdma_segment *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32_t (xdrs, &objp->handle))
|
||||
return FALSE;
|
||||
if (!xdr_uint32_t (xdrs, &objp->length))
|
||||
return FALSE;
|
||||
if (!xdr_uint64_t (xdrs, &objp->offset))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_xdr_read_chunk (XDR *xdrs, xdr_read_chunk *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32_t (xdrs, &objp->position))
|
||||
return FALSE;
|
||||
if (!xdr_xdr_rdma_segment (xdrs, &objp->target))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_xdr_read_list (XDR *xdrs, xdr_read_list *objp)
|
||||
{
|
||||
|
||||
if (!xdr_xdr_read_chunk (xdrs, &objp->entry))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_xdr_write_chunk (XDR *xdrs, xdr_write_chunk *objp)
|
||||
{
|
||||
|
||||
if (!xdr_array (xdrs, (char **)&objp->target.target_val, (u_int *) &objp->target.target_len, ~0,
|
||||
sizeof (xdr_rdma_segment), (xdrproc_t) xdr_xdr_rdma_segment))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_xdr_write_list (XDR *xdrs, xdr_write_list *objp)
|
||||
{
|
||||
|
||||
if (!xdr_xdr_write_chunk (xdrs, &objp->entry))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->next, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_header (XDR *xdrs, rpc_rdma_header *objp)
|
||||
{
|
||||
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_header_nomsg (XDR *xdrs, rpc_rdma_header_nomsg *objp)
|
||||
{
|
||||
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_header_padded (XDR *xdrs, rpc_rdma_header_padded *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32_t (xdrs, &objp->rdma_align))
|
||||
return FALSE;
|
||||
if (!xdr_uint32_t (xdrs, &objp->rdma_thresh))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reads, sizeof (xdr_read_list), (xdrproc_t) xdr_xdr_read_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_writes, sizeof (xdr_write_list), (xdrproc_t) xdr_xdr_write_list))
|
||||
return FALSE;
|
||||
if (!xdr_pointer (xdrs, (char **)&objp->rdma_reply, sizeof (xdr_write_chunk), (xdrproc_t) xdr_xdr_write_chunk))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_errcode (XDR *xdrs, rpc_rdma_errcode *objp)
|
||||
{
|
||||
|
||||
if (!xdr_enum (xdrs, (enum_t *) objp))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_errvers (XDR *xdrs, rpc_rdma_errvers *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32_t (xdrs, &objp->rdma_vers_low))
|
||||
return FALSE;
|
||||
if (!xdr_uint32_t (xdrs, &objp->rdma_vers_high))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rpc_rdma_error (XDR *xdrs, rpc_rdma_error *objp)
|
||||
{
|
||||
|
||||
if (!xdr_rpc_rdma_errcode (xdrs, &objp->err))
|
||||
return FALSE;
|
||||
switch (objp->err) {
|
||||
case ERR_VERS:
|
||||
if (!xdr_rpc_rdma_errvers (xdrs, &objp->range))
|
||||
return FALSE;
|
||||
break;
|
||||
case ERR_CHUNK:
|
||||
break;
|
||||
default:
|
||||
return FALSE;
|
||||
}
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rdma_proc (XDR *xdrs, rdma_proc *objp)
|
||||
{
|
||||
|
||||
if (!xdr_enum (xdrs, (enum_t *) objp))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rdma_body (XDR *xdrs, rdma_body *objp)
|
||||
{
|
||||
|
||||
if (!xdr_rdma_proc (xdrs, &objp->proc))
|
||||
return FALSE;
|
||||
switch (objp->proc) {
|
||||
case RDMA_MSG:
|
||||
if (!xdr_rpc_rdma_header (xdrs, &objp->rdma_msg))
|
||||
return FALSE;
|
||||
break;
|
||||
case RDMA_NOMSG:
|
||||
if (!xdr_rpc_rdma_header_nomsg (xdrs, &objp->rdma_nomsg))
|
||||
return FALSE;
|
||||
break;
|
||||
case RDMA_MSGP:
|
||||
if (!xdr_rpc_rdma_header_padded (xdrs, &objp->rdma_msgp))
|
||||
return FALSE;
|
||||
break;
|
||||
case RDMA_DONE:
|
||||
break;
|
||||
case RDMA_ERROR:
|
||||
if (!xdr_rpc_rdma_error (xdrs, &objp->rdma_error))
|
||||
return FALSE;
|
||||
break;
|
||||
default:
|
||||
return FALSE;
|
||||
}
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
bool_t
|
||||
xdr_rdma_msg (XDR *xdrs, rdma_msg *objp)
|
||||
{
|
||||
|
||||
if (!xdr_uint32_t (xdrs, &objp->rdma_xid))
|
||||
return FALSE;
|
||||
if (!xdr_uint32_t (xdrs, &objp->rdma_vers))
|
||||
return FALSE;
|
||||
if (!xdr_uint32_t (xdrs, &objp->rdma_credit))
|
||||
return FALSE;
|
||||
if (!xdr_rdma_body (xdrs, &objp->rdma_body))
|
||||
return FALSE;
|
||||
return TRUE;
|
||||
}
|
|
@ -46,5 +46,3 @@ run_rpcgen() {
|
|||
run_rpcgen nfs
|
||||
run_rpcgen rpc
|
||||
run_rpcgen portmap
|
||||
run_rpcgen rpc_rdma
|
||||
patch nfs_xdr.cpp < nfs_xdr.cpp.diff
|
||||
|
|
|
@ -16,22 +16,6 @@ void xdr_destroy(XDR* xdrs)
|
|||
delete xdrs;
|
||||
}
|
||||
|
||||
void xdr_set_rdma(XDR *xdrs)
|
||||
{
|
||||
xdrs->rdma = true;
|
||||
}
|
||||
|
||||
void xdr_set_rdma_chunk(XDR *xdrs, void *chunk)
|
||||
{
|
||||
assert(!xdrs->rdma_chunk || !chunk);
|
||||
xdrs->rdma_chunk = chunk;
|
||||
}
|
||||
|
||||
void* xdr_get_rdma_chunk(XDR *xdrs)
|
||||
{
|
||||
return xdrs->rdma_chunk;
|
||||
}
|
||||
|
||||
void xdr_reset(XDR *xdrs)
|
||||
{
|
||||
for (auto buf: xdrs->allocs)
|
||||
|
@ -39,9 +23,6 @@ void xdr_reset(XDR *xdrs)
|
|||
free(buf);
|
||||
}
|
||||
xdrs->buf = NULL;
|
||||
xdrs->rdma = false;
|
||||
xdrs->rdma_chunk = NULL;
|
||||
xdrs->rdma_chunk_used = false;
|
||||
xdrs->avail = 0;
|
||||
xdrs->allocs.resize(0);
|
||||
xdrs->in_linked_list.resize(0);
|
||||
|
@ -64,20 +45,6 @@ int xdr_encode(XDR *xdrs, xdrproc_t fn, void *data)
|
|||
return fn(xdrs, data);
|
||||
}
|
||||
|
||||
size_t xdr_encode_get_size(XDR *xdrs)
|
||||
{
|
||||
size_t len = 0;
|
||||
for (auto & buf: xdrs->buf_list)
|
||||
{
|
||||
len += buf.iov_len;
|
||||
}
|
||||
if (xdrs->last_end < xdrs->cur_out.size())
|
||||
{
|
||||
len += xdrs->cur_out.size() - xdrs->last_end;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
void xdr_encode_finish(XDR *xdrs, iovec **iov_list, unsigned *iov_count)
|
||||
{
|
||||
if (xdrs->last_end < xdrs->cur_out.size())
|
||||
|
@ -116,18 +83,6 @@ void xdr_add_malloc(XDR *xdrs, void *buf)
|
|||
xdrs->allocs.push_back(buf);
|
||||
}
|
||||
|
||||
void xdr_del_malloc(XDR *xdrs, void *buf)
|
||||
{
|
||||
for (int i = 0; i < xdrs->allocs.size(); i++)
|
||||
{
|
||||
if (xdrs->allocs[i] == buf)
|
||||
{
|
||||
xdrs->allocs.erase(xdrs->allocs.begin()+i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
xdr_string_t xdr_copy_string(XDR *xdrs, const std::string & str)
|
||||
{
|
||||
char *cp = (char*)malloc_or_die(str.size()+1);
|
||||
|
|
|
@ -55,15 +55,6 @@ void xdr_destroy(XDR* xdrs);
|
|||
// Free resources from any previous xdr_decode/xdr_encode calls
|
||||
void xdr_reset(XDR *xdrs);
|
||||
|
||||
// Mark XDR as used for RDMA
|
||||
void xdr_set_rdma(XDR *xdrs);
|
||||
|
||||
// Set (single) RDMA chunk buffer for this xdr before decoding an RDMA message
|
||||
void xdr_set_rdma_chunk(XDR *xdrs, void *chunk);
|
||||
|
||||
// Get the current RDMA chunk buffer
|
||||
void* xdr_get_rdma_chunk(XDR *xdrs);
|
||||
|
||||
// Try to decode <size> bytes from buffer <buf> using <fn>
|
||||
// Result may contain memory allocations that will be valid until the next call to xdr_{reset,destroy,decode,encode}
|
||||
int xdr_decode(XDR *xdrs, void *buf, unsigned size, xdrproc_t fn, void *data);
|
||||
|
@ -73,9 +64,6 @@ int xdr_decode(XDR *xdrs, void *buf, unsigned size, xdrproc_t fn, void *data);
|
|||
// May be called multiple times to encode multiple parts of the same message
|
||||
int xdr_encode(XDR *xdrs, xdrproc_t fn, void *data);
|
||||
|
||||
// Get current size of encoded data in <xdrs>
|
||||
size_t xdr_encode_get_size(XDR *xdrs);
|
||||
|
||||
// Get the result of previous xdr_encodes as a list of <struct iovec>'s
|
||||
// in <iov_list> (start) and <iov_count> (count).
|
||||
// The resulting iov_list is valid until the next call to xdr_{reset,destroy}.
|
||||
|
@ -86,9 +74,6 @@ void xdr_encode_finish(XDR *xdrs, iovec **iov_list, unsigned *iov_count);
|
|||
// Remember an allocated buffer to free it later on xdr_reset() or xdr_destroy()
|
||||
void xdr_add_malloc(XDR *xdrs, void *buf);
|
||||
|
||||
// Remove an allocated buffer from XDR
|
||||
void xdr_del_malloc(XDR *xdrs, void *buf);
|
||||
|
||||
xdr_string_t xdr_copy_string(XDR *xdrs, const std::string & str);
|
||||
|
||||
xdr_string_t xdr_copy_string(XDR *xdrs, const char *str);
|
||||
|
|
|
@ -28,19 +28,6 @@
|
|||
// RPC over TCP:
|
||||
//
|
||||
// BE 32bit length, then rpc_msg, then the procedure message itself
|
||||
//
|
||||
// RPC over RDMA:
|
||||
// RFC 8166 - Remote Direct Memory Access Transport for Remote Procedure Call Version 1
|
||||
// RFC 8267 - Network File System (NFS) Upper-Layer Binding to RPC-over-RDMA Version 1
|
||||
// RFC 8797 - Remote Direct Memory Access - Connection Manager (RDMA-CM) Private Data for RPC-over-RDMA Version 1
|
||||
// message is received in an RDMA Receive operation
|
||||
// message: list of read chunks, list of write chunks, optional reply write chunk, then actual RPC body if present
|
||||
// read chunk: BE 32bit position, BE 32bit registered memory key, BE 32bit length, BE 64bit offset
|
||||
// write chunk: BE 32bit registered memory key, BE 32bit length, BE 64bit offset
|
||||
// in reality for NFS 3.0: only 1 read chunk in write3 and symlink3, only 1 write chunk in read3 and readlink3
|
||||
// read chunk is read by the server using RDMA Read from the client memory after receiving RPC request
|
||||
// write chunk is pushed by the server using RDMA Write to the client memory before sending RPC reply
|
||||
// connection is established using RDMA-CM at default port 20049
|
||||
|
||||
#pragma once
|
||||
|
||||
|
@ -48,7 +35,6 @@
|
|||
|
||||
#include <string.h>
|
||||
#include <endian.h>
|
||||
#include <assert.h>
|
||||
#include <vector>
|
||||
|
||||
#include "malloc_or_die.h"
|
||||
|
@ -75,9 +61,6 @@ struct xdr_linked_list_t
|
|||
struct XDR
|
||||
{
|
||||
int x_op;
|
||||
bool rdma = false;
|
||||
void *rdma_chunk = NULL;
|
||||
bool rdma_chunk_used = false;
|
||||
|
||||
// For decoding:
|
||||
uint8_t *buf = NULL;
|
||||
|
@ -123,22 +106,13 @@ inline int xdr_opaque(XDR *xdrs, void *data, uint32_t len)
|
|||
return 1;
|
||||
}
|
||||
|
||||
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
|
||||
inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
||||
{
|
||||
if (xdrs->x_op == XDR_DECODE)
|
||||
{
|
||||
if (xdrs->avail < 4)
|
||||
return 0;
|
||||
uint32_t len = be32toh(*((uint32_t*)xdrs->buf));
|
||||
if (rdma_chunk && xdrs->rdma && xdrs->rdma_chunk)
|
||||
{
|
||||
// Take (only a single) RDMA chunk from xdrs->rdma_chunk while decoding
|
||||
assert(!xdrs->rdma_chunk_used);
|
||||
xdrs->rdma_chunk_used = true;
|
||||
data->data = (char*)xdrs->rdma_chunk;
|
||||
data->size = len;
|
||||
return 1;
|
||||
}
|
||||
uint32_t padded = len_pad4(len);
|
||||
if (xdrs->avail < 4+padded)
|
||||
return 0;
|
||||
|
@ -149,8 +123,7 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_c
|
|||
}
|
||||
else
|
||||
{
|
||||
// Always encode RDMA chunks as separate iovecs
|
||||
if (data->size < XDR_COPY_LENGTH && (!rdma_chunk || !xdrs->rdma))
|
||||
if (data->size < XDR_COPY_LENGTH)
|
||||
{
|
||||
unsigned old = xdrs->cur_out.size();
|
||||
xdrs->cur_out.resize(old + 4+data->size);
|
||||
|
@ -173,9 +146,8 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_c
|
|||
.iov_len = data->size,
|
||||
});
|
||||
}
|
||||
if ((data->size & 3) && (!rdma_chunk || !xdrs->rdma))
|
||||
if (data->size & 3)
|
||||
{
|
||||
// No padding for RDMA chunks
|
||||
int pad = 4-(data->size & 3);
|
||||
unsigned old = xdrs->cur_out.size();
|
||||
xdrs->cur_out.resize(old+pad);
|
||||
|
@ -186,9 +158,9 @@ inline int xdr_bytes(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_c
|
|||
return 1;
|
||||
}
|
||||
|
||||
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen, bool rdma_chunk = false)
|
||||
inline int xdr_string(XDR *xdrs, xdr_string_t *data, uint32_t maxlen)
|
||||
{
|
||||
return xdr_bytes(xdrs, data, maxlen, rdma_chunk);
|
||||
return xdr_bytes(xdrs, data, maxlen);
|
||||
}
|
||||
|
||||
inline int xdr_u_int(XDR *xdrs, void *data)
|
||||
|
@ -210,11 +182,6 @@ inline int xdr_u_int(XDR *xdrs, void *data)
|
|||
return 1;
|
||||
}
|
||||
|
||||
inline int xdr_uint32_t(XDR *xdrs, void *data)
|
||||
{
|
||||
return xdr_u_int(xdrs, data);
|
||||
}
|
||||
|
||||
inline int xdr_enum(XDR *xdrs, void *data)
|
||||
{
|
||||
return xdr_u_int(xdrs, data);
|
||||
|
|
|
@ -1,216 +0,0 @@
|
|||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
//
|
||||
// Simple & stupid RDMA-enabled memory allocator (allocates buffers within ibv_mr's)
|
||||
|
||||
#include <stdio.h>
|
||||
#include <assert.h>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include "rdma_alloc.h"
|
||||
#include "malloc_or_die.h"
|
||||
|
||||
struct rdma_region_t
|
||||
{
|
||||
void *buf = NULL;
|
||||
size_t len = 0;
|
||||
ibv_mr *mr = NULL;
|
||||
};
|
||||
|
||||
struct rdma_frag_t
|
||||
{
|
||||
rdma_region_t *rgn = NULL;
|
||||
size_t len = 0;
|
||||
bool is_free = false;
|
||||
};
|
||||
|
||||
struct rdma_free_t
|
||||
{
|
||||
size_t len = 0;
|
||||
void *buf = NULL;
|
||||
};
|
||||
|
||||
inline bool operator < (const rdma_free_t &a, const rdma_free_t &b)
|
||||
{
|
||||
return a.len < b.len || a.len == b.len && a.buf < b.buf;
|
||||
}
|
||||
|
||||
struct rdma_allocator_t
|
||||
{
|
||||
size_t rdma_alloc_size = 1048576;
|
||||
size_t rdma_max_unused = 500*1048576;
|
||||
int rdma_access = IBV_ACCESS_LOCAL_WRITE;
|
||||
ibv_pd *pd = NULL;
|
||||
|
||||
std::set<rdma_region_t*> regions;
|
||||
std::map<void*, rdma_frag_t> frags;
|
||||
std::set<rdma_free_t> freelist;
|
||||
size_t freebuffers = 0;
|
||||
};
|
||||
|
||||
rdma_allocator_t *rdma_malloc_create(ibv_pd *pd, size_t rdma_alloc_size, size_t rdma_max_unused, int rdma_access)
|
||||
{
|
||||
rdma_allocator_t *self = new rdma_allocator_t();
|
||||
self->pd = pd;
|
||||
self->rdma_alloc_size = rdma_alloc_size ? rdma_alloc_size : 1048576;
|
||||
self->rdma_max_unused = rdma_max_unused ? rdma_max_unused : 500*1048576;
|
||||
self->rdma_access = rdma_access;
|
||||
return self;
|
||||
}
|
||||
|
||||
static void rdma_malloc_free_unused_buffers(rdma_allocator_t *self, size_t max_unused, bool force)
|
||||
{
|
||||
auto free_it = self->freelist.end();
|
||||
if (free_it == self->freelist.begin())
|
||||
return;
|
||||
free_it--;
|
||||
do
|
||||
{
|
||||
auto frag_it = self->frags.find(free_it->buf);
|
||||
assert(frag_it != self->frags.end());
|
||||
if (frag_it->second.len != frag_it->second.rgn->len)
|
||||
{
|
||||
if (force)
|
||||
{
|
||||
fprintf(stderr, "BUG: Attempt to destroy RDMA allocator while buffers are not freed yet\n");
|
||||
abort();
|
||||
}
|
||||
break;
|
||||
}
|
||||
self->freebuffers -= frag_it->second.rgn->len;
|
||||
ibv_dereg_mr(frag_it->second.rgn->mr);
|
||||
free(frag_it->second.rgn);
|
||||
self->regions.erase(frag_it->second.rgn);
|
||||
self->frags.erase(frag_it);
|
||||
if (free_it == self->freelist.begin())
|
||||
{
|
||||
self->freelist.erase(free_it);
|
||||
break;
|
||||
}
|
||||
self->freelist.erase(free_it--);
|
||||
} while (self->freebuffers > max_unused);
|
||||
}
|
||||
|
||||
void rdma_malloc_destroy(rdma_allocator_t *self)
|
||||
{
|
||||
rdma_malloc_free_unused_buffers(self, 0, true);
|
||||
assert(!self->freebuffers);
|
||||
assert(!self->regions.size());
|
||||
assert(!self->frags.size());
|
||||
assert(!self->freelist.size());
|
||||
delete self;
|
||||
}
|
||||
|
||||
void *rdma_malloc_alloc(rdma_allocator_t *self, size_t size)
|
||||
{
|
||||
auto it = self->freelist.lower_bound((rdma_free_t){ .len = size });
|
||||
if (it == self->freelist.end())
|
||||
{
|
||||
// round size up to rdma_malloc_size (1 MB)
|
||||
size_t alloc_size = ((size + self->rdma_alloc_size - 1) / self->rdma_alloc_size) * self->rdma_alloc_size;
|
||||
rdma_region_t *r = (rdma_region_t*)malloc_or_die(alloc_size + sizeof(rdma_region_t));
|
||||
r->buf = r+1;
|
||||
r->len = alloc_size;
|
||||
r->mr = ibv_reg_mr(self->pd, r->buf, r->len, self->rdma_access);
|
||||
if (!r->mr)
|
||||
{
|
||||
fprintf(stderr, "Failed to register RDMA memory region: %s\n", strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
self->regions.insert(r);
|
||||
self->frags[r->buf] = (rdma_frag_t){ .rgn = r, .len = alloc_size, .is_free = true };
|
||||
it = self->freelist.insert((rdma_free_t){ .len = alloc_size, .buf = r->buf }).first;
|
||||
self->freebuffers += alloc_size;
|
||||
}
|
||||
void *ptr = it->buf;
|
||||
auto & frag = self->frags.at(ptr);
|
||||
self->freelist.erase(it);
|
||||
assert(frag.len >= size && frag.is_free);
|
||||
if (frag.len == frag.rgn->len)
|
||||
{
|
||||
self->freebuffers -= frag.rgn->len;
|
||||
}
|
||||
if (frag.len == size)
|
||||
{
|
||||
frag.is_free = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
frag.len -= size;
|
||||
ptr = (uint8_t*)ptr + frag.len;
|
||||
self->freelist.insert((rdma_free_t){ .len = frag.len, .buf = frag.rgn->buf });
|
||||
self->frags[ptr] = (rdma_frag_t){ .rgn = frag.rgn, .len = size, .is_free = false };
|
||||
}
|
||||
return ptr;
|
||||
}
|
||||
|
||||
void rdma_malloc_free(rdma_allocator_t *self, void *buf)
|
||||
{
|
||||
auto frag_it = self->frags.find(buf);
|
||||
if (frag_it == self->frags.end())
|
||||
{
|
||||
fprintf(stderr, "BUG: Attempt to double-free RDMA buffer fragment 0x%jx\n", (size_t)buf);
|
||||
return;
|
||||
}
|
||||
auto prev_it = frag_it, next_it = frag_it;
|
||||
if (frag_it != self->frags.begin())
|
||||
prev_it--;
|
||||
next_it++;
|
||||
bool merge_back = prev_it != frag_it &&
|
||||
prev_it->second.is_free &&
|
||||
prev_it->second.rgn == frag_it->second.rgn &&
|
||||
(uint8_t*)prev_it->first+prev_it->second.len == frag_it->first;
|
||||
bool merge_next = next_it != self->frags.end() &&
|
||||
next_it->second.is_free &&
|
||||
next_it->second.rgn == frag_it->second.rgn &&
|
||||
next_it->first == (uint8_t*)frag_it->first+frag_it->second.len;
|
||||
if (merge_back && merge_next)
|
||||
{
|
||||
prev_it->second.len += frag_it->second.len + next_it->second.len;
|
||||
self->freelist.erase((rdma_free_t){ .len = next_it->second.len, .buf = next_it->first });
|
||||
self->frags.erase(next_it);
|
||||
self->frags.erase(frag_it);
|
||||
frag_it = prev_it;
|
||||
}
|
||||
else if (merge_back)
|
||||
{
|
||||
prev_it->second.len += frag_it->second.len;
|
||||
self->frags.erase(frag_it);
|
||||
frag_it = prev_it;
|
||||
}
|
||||
else if (merge_next)
|
||||
{
|
||||
frag_it->second.is_free = true;
|
||||
frag_it->second.len += next_it->second.len;
|
||||
self->freelist.erase((rdma_free_t){ .len = next_it->second.len, .buf = next_it->first });
|
||||
self->frags.erase(next_it);
|
||||
}
|
||||
else
|
||||
{
|
||||
frag_it->second.is_free = true;
|
||||
self->freelist.insert((rdma_free_t){ .len = frag_it->second.len, .buf = frag_it->first });
|
||||
}
|
||||
assert(frag_it->second.len <= frag_it->second.rgn->len);
|
||||
if (frag_it->second.len == frag_it->second.rgn->len)
|
||||
{
|
||||
// The whole buffer is freed
|
||||
self->freebuffers += frag_it->second.rgn->len;
|
||||
if (self->freebuffers > self->rdma_max_unused)
|
||||
{
|
||||
rdma_malloc_free_unused_buffers(self, self->rdma_max_unused, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t rdma_malloc_get_lkey(rdma_allocator_t *self, void *buf)
|
||||
{
|
||||
auto frag_it = self->frags.upper_bound(buf);
|
||||
if (frag_it != self->frags.begin())
|
||||
{
|
||||
frag_it--;
|
||||
if ((uint8_t*)frag_it->first + frag_it->second.len > buf)
|
||||
return frag_it->second.rgn->mr->lkey;
|
||||
}
|
||||
fprintf(stderr, "BUG: Attempt to use an unknown RDMA buffer fragment 0x%zx\n", (size_t)buf);
|
||||
abort();
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
//
|
||||
// Simple & stupid RDMA-enabled memory allocator (allocates buffers within ibv_mr's)
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <infiniband/verbs.h>
|
||||
#include <stdint.h>
|
||||
|
||||
struct rdma_allocator_t;
|
||||
|
||||
rdma_allocator_t *rdma_malloc_create(ibv_pd *pd, size_t rdma_alloc_size, size_t rdma_max_unused, int rdma_access);
|
||||
void rdma_malloc_destroy(rdma_allocator_t *self);
|
||||
void *rdma_malloc_alloc(rdma_allocator_t *self, size_t size);
|
||||
void rdma_malloc_free(rdma_allocator_t *self, void *buf);
|
||||
uint32_t rdma_malloc_get_lkey(rdma_allocator_t *self, void *buf);
|
Loading…
Reference in New Issue