Implement Stage/Unstage & volume locking for CSI to prevent parallel modifications of the same volume
parent
3629dbc54d
commit
02993ee1dd
|
@ -6,8 +6,12 @@ package vitastor
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -29,6 +33,9 @@ type NodeServer struct
|
|||
stateDir string
|
||||
mounter mount.Interface
|
||||
restartInterval time.Duration
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
volumeLocks map[string]bool
|
||||
}
|
||||
|
||||
type DeviceState struct
|
||||
|
@ -58,7 +65,9 @@ func NewNodeServer(driver *Driver) *NodeServer
|
|||
useVduse: checkVduseSupport(),
|
||||
stateDir: stateDir,
|
||||
mounter: mount.New(""),
|
||||
volumeLocks: make(map[string]bool),
|
||||
}
|
||||
ns.cond = sync.NewCond(&ns.mu)
|
||||
if (ns.useVduse)
|
||||
{
|
||||
ns.restoreVduseDaemons()
|
||||
|
@ -76,16 +85,24 @@ func NewNodeServer(driver *Driver) *NodeServer
|
|||
return ns
|
||||
}
|
||||
|
||||
// NodeStageVolume mounts the volume to a staging path on the node.
|
||||
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error)
|
||||
func (ns *NodeServer) lockVolume(lockId string)
|
||||
{
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
ns.mu.Lock()
|
||||
defer ns.mu.Unlock()
|
||||
for (ns.volumeLocks[lockId])
|
||||
{
|
||||
ns.cond.Wait()
|
||||
}
|
||||
ns.volumeLocks[lockId] = true
|
||||
ns.cond.Broadcast()
|
||||
}
|
||||
|
||||
// NodeUnstageVolume unstages the volume from the staging path
|
||||
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error)
|
||||
func (ns *NodeServer) unlockVolume(lockId string)
|
||||
{
|
||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
ns.mu.Lock()
|
||||
defer ns.mu.Unlock()
|
||||
delete(ns.volumeLocks, lockId)
|
||||
ns.cond.Broadcast()
|
||||
}
|
||||
|
||||
func (ns *NodeServer) restarter()
|
||||
|
@ -134,58 +151,83 @@ func (ns *NodeServer) restoreVduseDaemons()
|
|||
vdpaId := filepath.Base(stateFile)
|
||||
vdpaId = vdpaId[0:len(vdpaId)-5]
|
||||
// Check if VDPA device is still added to the bus
|
||||
if (devs[vdpaId] != nil)
|
||||
{
|
||||
// Check if the storage daemon is still active
|
||||
pidFile := ns.stateDir + vdpaId + ".pid"
|
||||
exists := false
|
||||
proc, err := findByPidFile(pidFile)
|
||||
if (err == nil)
|
||||
{
|
||||
exists = proc.Signal(syscall.Signal(0)) == nil
|
||||
}
|
||||
if (!exists)
|
||||
{
|
||||
// Restart daemon
|
||||
stateJSON, err := os.ReadFile(stateFile)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("error reading state file %v: %v", stateFile, err)
|
||||
}
|
||||
else
|
||||
{
|
||||
var state DeviceState
|
||||
err := json.Unmarshal(stateJSON, &state)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON))
|
||||
}
|
||||
else
|
||||
{
|
||||
klog.Warningf("restarting storage daemon for volume %v (VDPA ID %v)", state.Image, vdpaId)
|
||||
_ = startStorageDaemon(vdpaId, state.Image, pidFile, state.ConfigPath, state.Readonly)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
if (devs[vdpaId] == nil)
|
||||
{
|
||||
// Unused, clean it up
|
||||
unmapVduseById(ns.stateDir, vdpaId)
|
||||
continue
|
||||
}
|
||||
|
||||
stateJSON, err := os.ReadFile(stateFile)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("error reading state file %v: %v", stateFile, err)
|
||||
continue
|
||||
}
|
||||
var state DeviceState
|
||||
err = json.Unmarshal(stateJSON, &state)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON))
|
||||
continue
|
||||
}
|
||||
|
||||
ns.lockVolume(state.ConfigPath+":"+state.Image)
|
||||
|
||||
// Recheck state file after locking
|
||||
_, err = os.ReadFile(stateFile)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Warningf("state file %v disappeared, skipping volume", stateFile)
|
||||
ns.unlockVolume(state.ConfigPath+":"+state.Image)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the storage daemon is still active
|
||||
pidFile := ns.stateDir + vdpaId + ".pid"
|
||||
exists := false
|
||||
proc, err := findByPidFile(pidFile)
|
||||
if (err == nil)
|
||||
{
|
||||
exists = proc.Signal(syscall.Signal(0)) == nil
|
||||
}
|
||||
if (!exists)
|
||||
{
|
||||
// Restart daemon
|
||||
klog.Warningf("restarting storage daemon for volume %v (VDPA ID %v)", state.Image, vdpaId)
|
||||
_ = startStorageDaemon(vdpaId, state.Image, pidFile, state.ConfigPath, state.Readonly)
|
||||
}
|
||||
|
||||
ns.unlockVolume(state.ConfigPath+":"+state.Image)
|
||||
}
|
||||
}
|
||||
|
||||
// NodePublishVolume mounts the volume mounted to the staging path to the target path
|
||||
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error)
|
||||
// NodeStageVolume mounts the volume to a staging path on the node.
|
||||
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error)
|
||||
{
|
||||
klog.Infof("received node publish volume request %+v", protosanitizer.StripSecrets(req))
|
||||
klog.Infof("received node stage volume request %+v", protosanitizer.StripSecrets(req))
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
ctxVars := make(map[string]string)
|
||||
err := json.Unmarshal([]byte(req.VolumeId), &ctxVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
|
||||
}
|
||||
_, err = GetConnectionParams(ctxVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
ns.lockVolume(ctxVars["configPath"]+":"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
|
||||
|
||||
targetPath := req.GetStagingTargetPath()
|
||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||
|
||||
// Check that it's not already mounted
|
||||
_, err := mount.IsNotMountPoint(ns.mounter, targetPath)
|
||||
_, err = mount.IsNotMountPoint(ns.mounter, targetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
if (os.IsNotExist(err))
|
||||
|
@ -221,28 +263,14 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
}
|
||||
}
|
||||
|
||||
ctxVars := make(map[string]string)
|
||||
err = json.Unmarshal([]byte(req.VolumeId), &ctxVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
|
||||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
_, err = GetConnectionParams(ctxVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var devicePath, vdpaId string
|
||||
if (!ns.useVduse)
|
||||
{
|
||||
devicePath, err = mapNbd(volName, ctxVars, req.GetReadonly())
|
||||
devicePath, err = mapNbd(volName, ctxVars, false)
|
||||
}
|
||||
else
|
||||
{
|
||||
devicePath, vdpaId, err = mapVduse(ns.stateDir, volName, ctxVars, req.GetReadonly())
|
||||
devicePath, vdpaId, err = mapVduse(ns.stateDir, volName, ctxVars, false)
|
||||
}
|
||||
if (err != nil)
|
||||
{
|
||||
|
@ -326,7 +354,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
)
|
||||
goto unmap
|
||||
}
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
|
||||
unmap:
|
||||
if (!ns.useVduse || len(devicePath) >= 8 && devicePath[0:8] == "/dev/nbd")
|
||||
|
@ -340,12 +368,168 @@ unmap:
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// NodeUnstageVolume unstages the volume from the staging path
|
||||
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error)
|
||||
{
|
||||
klog.Infof("received node unstage volume request %+v", protosanitizer.StripSecrets(req))
|
||||
|
||||
ctxVars := make(map[string]string)
|
||||
err := json.Unmarshal([]byte(req.VolumeId), &ctxVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
|
||||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
ns.lockVolume(ctxVars["configPath"]+":"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
|
||||
|
||||
targetPath := req.GetStagingTargetPath()
|
||||
devicePath, refCount, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
if (os.IsNotExist(err))
|
||||
{
|
||||
return nil, status.Error(codes.NotFound, "Target path not found")
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if (devicePath == "")
|
||||
{
|
||||
// volume not mounted
|
||||
klog.Warningf("%s is not a mountpoint, deleting", targetPath)
|
||||
os.Remove(targetPath)
|
||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// unmount
|
||||
err = mount.CleanupMountPoint(targetPath, ns.mounter, false)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// unmap device
|
||||
if (refCount == 1)
|
||||
{
|
||||
if (!ns.useVduse)
|
||||
{
|
||||
unmapNbd(devicePath)
|
||||
}
|
||||
else
|
||||
{
|
||||
unmapVduse(ns.stateDir, devicePath)
|
||||
}
|
||||
}
|
||||
|
||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// NodePublishVolume mounts the volume mounted to the staging path to the target path
|
||||
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error)
|
||||
{
|
||||
klog.Infof("received node publish volume request %+v", protosanitizer.StripSecrets(req))
|
||||
|
||||
ctxVars := make(map[string]string)
|
||||
err := json.Unmarshal([]byte(req.VolumeId), &ctxVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
|
||||
}
|
||||
_, err = GetConnectionParams(ctxVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
ns.lockVolume(ctxVars["configPath"]+":"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
|
||||
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
targetPath := req.GetTargetPath()
|
||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||
|
||||
// Check that stagingTargetPath is mounted
|
||||
_, err = mount.IsNotMountPoint(ns.mounter, stagingTargetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("staging path %v is not mounted: %v", stagingTargetPath, err)
|
||||
return nil, fmt.Errorf("staging path %v is not mounted: %v", stagingTargetPath, err)
|
||||
}
|
||||
|
||||
// Check that targetPath is not already mounted
|
||||
_, err = mount.IsNotMountPoint(ns.mounter, targetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
if (os.IsNotExist(err))
|
||||
{
|
||||
if (isBlock)
|
||||
{
|
||||
pathFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, 0o600)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to create block device mount target %s with error: %v", targetPath, err)
|
||||
return nil, err
|
||||
}
|
||||
err = pathFile.Close()
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to close %s with error: %v", targetPath, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
err := os.MkdirAll(targetPath, 0777)
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to create fs mount target %s with error: %v", targetPath, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
execArgs := []string{"--bind", stagingTargetPath, targetPath}
|
||||
if (req.GetReadonly())
|
||||
{
|
||||
execArgs = append(execArgs, "-o", "ro")
|
||||
}
|
||||
cmd := exec.Command("mount", execArgs...)
|
||||
cmd.Stderr = os.Stderr
|
||||
klog.Infof("binding volume %v (%v) from %v to %v", volName, ctxVars["configPath"], stagingTargetPath, targetPath)
|
||||
out, err := cmd.Output()
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, fmt.Errorf("Error running mount %v: %s", strings.Join(execArgs, " "), out)
|
||||
}
|
||||
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// NodeUnpublishVolume unmounts the volume from the target path
|
||||
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error)
|
||||
{
|
||||
klog.Infof("received node unpublish volume request %+v", protosanitizer.StripSecrets(req))
|
||||
|
||||
ctxVars := make(map[string]string)
|
||||
err := json.Unmarshal([]byte(req.VolumeId), &ctxVars)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, "volume ID not in JSON format")
|
||||
}
|
||||
volName := ctxVars["name"]
|
||||
|
||||
ns.lockVolume(ctxVars["configPath"]+":"+volName)
|
||||
defer ns.unlockVolume(ctxVars["configPath"]+":"+volName)
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
devicePath, refCount, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath)
|
||||
devicePath, _, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
if (os.IsNotExist(err))
|
||||
|
@ -361,24 +545,14 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||
os.Remove(targetPath)
|
||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// unmount
|
||||
err = mount.CleanupMountPoint(targetPath, ns.mounter, false)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
// unmap NBD device
|
||||
if (refCount == 1)
|
||||
{
|
||||
if (!ns.useVduse)
|
||||
{
|
||||
unmapNbd(devicePath)
|
||||
}
|
||||
else
|
||||
{
|
||||
unmapVduse(ns.stateDir, devicePath)
|
||||
}
|
||||
}
|
||||
|
||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
|
@ -397,7 +571,17 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
|
|||
// NodeGetCapabilities returns the supported capabilities of the node server
|
||||
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error)
|
||||
{
|
||||
return &csi.NodeGetCapabilitiesResponse{}, nil
|
||||
return &csi.NodeGetCapabilitiesResponse{
|
||||
Capabilities: []*csi.NodeServiceCapability{
|
||||
&csi.NodeServiceCapability{
|
||||
Type: &csi.NodeServiceCapability_Rpc{
|
||||
Rpc: &csi.NodeServiceCapability_RPC{
|
||||
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NodeGetInfo returns NodeGetInfoResponse for CO.
|
||||
|
|
Loading…
Reference in New Issue