diff --git a/csi/src/nodeserver.go b/csi/src/nodeserver.go index 04e7e1fb..78a5ab39 100644 --- a/csi/src/nodeserver.go +++ b/csi/src/nodeserver.go @@ -6,8 +6,12 @@ package vitastor import ( "context" "encoding/json" + "fmt" "os" + "os/exec" "path/filepath" + "strings" + "sync" "syscall" "time" @@ -29,6 +33,9 @@ type NodeServer struct stateDir string mounter mount.Interface restartInterval time.Duration + mu sync.Mutex + cond *sync.Cond + volumeLocks map[string]bool } type DeviceState struct @@ -58,7 +65,9 @@ func NewNodeServer(driver *Driver) *NodeServer useVduse: checkVduseSupport(), stateDir: stateDir, mounter: mount.New(""), + volumeLocks: make(map[string]bool), } + ns.cond = sync.NewCond(&ns.mu) if (ns.useVduse) { ns.restoreVduseDaemons() @@ -76,16 +85,24 @@ func NewNodeServer(driver *Driver) *NodeServer return ns } -// NodeStageVolume mounts the volume to a staging path on the node. -func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) +func (ns *NodeServer) lockVolume(lockId string) { - return &csi.NodeStageVolumeResponse{}, nil + ns.mu.Lock() + defer ns.mu.Unlock() + for (ns.volumeLocks[lockId]) + { + ns.cond.Wait() + } + ns.volumeLocks[lockId] = true + ns.cond.Broadcast() } -// NodeUnstageVolume unstages the volume from the staging path -func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) +func (ns *NodeServer) unlockVolume(lockId string) { - return &csi.NodeUnstageVolumeResponse{}, nil + ns.mu.Lock() + defer ns.mu.Unlock() + delete(ns.volumeLocks, lockId) + ns.cond.Broadcast() } func (ns *NodeServer) restarter() @@ -134,58 +151,83 @@ func (ns *NodeServer) restoreVduseDaemons() vdpaId := filepath.Base(stateFile) vdpaId = vdpaId[0:len(vdpaId)-5] // Check if VDPA device is still added to the bus - if (devs[vdpaId] != nil) - { - // Check if the storage daemon is still active - pidFile := ns.stateDir + vdpaId + ".pid" - exists := false - proc, err := findByPidFile(pidFile) - if (err == nil) - { - exists = proc.Signal(syscall.Signal(0)) == nil - } - if (!exists) - { - // Restart daemon - stateJSON, err := os.ReadFile(stateFile) - if (err != nil) - { - klog.Warningf("error reading state file %v: %v", stateFile, err) - } - else - { - var state DeviceState - err := json.Unmarshal(stateJSON, &state) - if (err != nil) - { - klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON)) - } - else - { - klog.Warningf("restarting storage daemon for volume %v (VDPA ID %v)", state.Image, vdpaId) - _ = startStorageDaemon(vdpaId, state.Image, pidFile, state.ConfigPath, state.Readonly) - } - } - } - } - else + if (devs[vdpaId] == nil) { // Unused, clean it up unmapVduseById(ns.stateDir, vdpaId) + continue } + + stateJSON, err := os.ReadFile(stateFile) + if (err != nil) + { + klog.Warningf("error reading state file %v: %v", stateFile, err) + continue + } + var state DeviceState + err = json.Unmarshal(stateJSON, &state) + if (err != nil) + { + klog.Warningf("state file %v contains invalid JSON (error %v): %v", stateFile, err, string(stateJSON)) + continue + } + + ns.lockVolume(state.ConfigPath+":"+state.Image) + + // Recheck state file after locking + _, err = os.ReadFile(stateFile) + if (err != nil) + { + klog.Warningf("state file %v disappeared, skipping volume", stateFile) + ns.unlockVolume(state.ConfigPath+":"+state.Image) + continue + } + + // Check if the storage daemon is still active + pidFile := ns.stateDir + vdpaId + ".pid" + exists := false + proc, err := findByPidFile(pidFile) + if (err == nil) + { + exists = proc.Signal(syscall.Signal(0)) == nil + } + if (!exists) + { + // Restart daemon + klog.Warningf("restarting storage daemon for volume %v (VDPA ID %v)", state.Image, vdpaId) + _ = startStorageDaemon(vdpaId, state.Image, pidFile, state.ConfigPath, state.Readonly) + } + + ns.unlockVolume(state.ConfigPath+":"+state.Image) } } -// NodePublishVolume mounts the volume mounted to the staging path to the target path -func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) +// NodeStageVolume mounts the volume to a staging path on the node. +func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - klog.Infof("received node publish volume request %+v", protosanitizer.StripSecrets(req)) + klog.Infof("received node stage volume request %+v", protosanitizer.StripSecrets(req)) - targetPath := req.GetTargetPath() + ctxVars := make(map[string]string) + err := json.Unmarshal([]byte(req.VolumeId), &ctxVars) + if (err != nil) + { + return nil, status.Error(codes.Internal, "volume ID not in JSON format") + } + _, err = GetConnectionParams(ctxVars) + if (err != nil) + { + return nil, err + } + volName := ctxVars["name"] + + ns.lockVolume(ctxVars["configPath"]+":"+volName) + defer ns.unlockVolume(ctxVars["configPath"]+":"+volName) + + targetPath := req.GetStagingTargetPath() isBlock := req.GetVolumeCapability().GetBlock() != nil // Check that it's not already mounted - _, err := mount.IsNotMountPoint(ns.mounter, targetPath) + _, err = mount.IsNotMountPoint(ns.mounter, targetPath) if (err != nil) { if (os.IsNotExist(err)) @@ -221,28 +263,14 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } } - ctxVars := make(map[string]string) - err = json.Unmarshal([]byte(req.VolumeId), &ctxVars) - if (err != nil) - { - return nil, status.Error(codes.Internal, "volume ID not in JSON format") - } - volName := ctxVars["name"] - - _, err = GetConnectionParams(ctxVars) - if (err != nil) - { - return nil, err - } - var devicePath, vdpaId string if (!ns.useVduse) { - devicePath, err = mapNbd(volName, ctxVars, req.GetReadonly()) + devicePath, err = mapNbd(volName, ctxVars, false) } else { - devicePath, vdpaId, err = mapVduse(ns.stateDir, volName, ctxVars, req.GetReadonly()) + devicePath, vdpaId, err = mapVduse(ns.stateDir, volName, ctxVars, false) } if (err != nil) { @@ -326,7 +354,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis ) goto unmap } - return &csi.NodePublishVolumeResponse{}, nil + return &csi.NodeStageVolumeResponse{}, nil unmap: if (!ns.useVduse || len(devicePath) >= 8 && devicePath[0:8] == "/dev/nbd") @@ -340,12 +368,168 @@ unmap: return nil, err } +// NodeUnstageVolume unstages the volume from the staging path +func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) +{ + klog.Infof("received node unstage volume request %+v", protosanitizer.StripSecrets(req)) + + ctxVars := make(map[string]string) + err := json.Unmarshal([]byte(req.VolumeId), &ctxVars) + if (err != nil) + { + return nil, status.Error(codes.Internal, "volume ID not in JSON format") + } + volName := ctxVars["name"] + + ns.lockVolume(ctxVars["configPath"]+":"+volName) + defer ns.unlockVolume(ctxVars["configPath"]+":"+volName) + + targetPath := req.GetStagingTargetPath() + devicePath, refCount, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath) + if (err != nil) + { + if (os.IsNotExist(err)) + { + return nil, status.Error(codes.NotFound, "Target path not found") + } + return nil, err + } + if (devicePath == "") + { + // volume not mounted + klog.Warningf("%s is not a mountpoint, deleting", targetPath) + os.Remove(targetPath) + return &csi.NodeUnstageVolumeResponse{}, nil + } + + // unmount + err = mount.CleanupMountPoint(targetPath, ns.mounter, false) + if (err != nil) + { + return nil, err + } + + // unmap device + if (refCount == 1) + { + if (!ns.useVduse) + { + unmapNbd(devicePath) + } + else + { + unmapVduse(ns.stateDir, devicePath) + } + } + + return &csi.NodeUnstageVolumeResponse{}, nil +} + +// NodePublishVolume mounts the volume mounted to the staging path to the target path +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) +{ + klog.Infof("received node publish volume request %+v", protosanitizer.StripSecrets(req)) + + ctxVars := make(map[string]string) + err := json.Unmarshal([]byte(req.VolumeId), &ctxVars) + if (err != nil) + { + return nil, status.Error(codes.Internal, "volume ID not in JSON format") + } + _, err = GetConnectionParams(ctxVars) + if (err != nil) + { + return nil, err + } + volName := ctxVars["name"] + + ns.lockVolume(ctxVars["configPath"]+":"+volName) + defer ns.unlockVolume(ctxVars["configPath"]+":"+volName) + + stagingTargetPath := req.GetStagingTargetPath() + targetPath := req.GetTargetPath() + isBlock := req.GetVolumeCapability().GetBlock() != nil + + // Check that stagingTargetPath is mounted + _, err = mount.IsNotMountPoint(ns.mounter, stagingTargetPath) + if (err != nil) + { + klog.Errorf("staging path %v is not mounted: %v", stagingTargetPath, err) + return nil, fmt.Errorf("staging path %v is not mounted: %v", stagingTargetPath, err) + } + + // Check that targetPath is not already mounted + _, err = mount.IsNotMountPoint(ns.mounter, targetPath) + if (err != nil) + { + if (os.IsNotExist(err)) + { + if (isBlock) + { + pathFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, 0o600) + if (err != nil) + { + klog.Errorf("failed to create block device mount target %s with error: %v", targetPath, err) + return nil, err + } + err = pathFile.Close() + if (err != nil) + { + klog.Errorf("failed to close %s with error: %v", targetPath, err) + return nil, err + } + } + else + { + err := os.MkdirAll(targetPath, 0777) + if (err != nil) + { + klog.Errorf("failed to create fs mount target %s with error: %v", targetPath, err) + return nil, err + } + } + } + else + { + return nil, err + } + } + + execArgs := []string{"--bind", stagingTargetPath, targetPath} + if (req.GetReadonly()) + { + execArgs = append(execArgs, "-o", "ro") + } + cmd := exec.Command("mount", execArgs...) + cmd.Stderr = os.Stderr + klog.Infof("binding volume %v (%v) from %v to %v", volName, ctxVars["configPath"], stagingTargetPath, targetPath) + out, err := cmd.Output() + if (err != nil) + { + return nil, fmt.Errorf("Error running mount %v: %s", strings.Join(execArgs, " "), out) + } + + return &csi.NodePublishVolumeResponse{}, nil +} + // NodeUnpublishVolume unmounts the volume from the target path func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { klog.Infof("received node unpublish volume request %+v", protosanitizer.StripSecrets(req)) + + ctxVars := make(map[string]string) + err := json.Unmarshal([]byte(req.VolumeId), &ctxVars) + if (err != nil) + { + return nil, status.Error(codes.Internal, "volume ID not in JSON format") + } + volName := ctxVars["name"] + + ns.lockVolume(ctxVars["configPath"]+":"+volName) + defer ns.unlockVolume(ctxVars["configPath"]+":"+volName) + targetPath := req.GetTargetPath() - devicePath, refCount, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath) + devicePath, _, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath) if (err != nil) { if (os.IsNotExist(err)) @@ -361,24 +545,14 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu os.Remove(targetPath) return &csi.NodeUnpublishVolumeResponse{}, nil } + // unmount err = mount.CleanupMountPoint(targetPath, ns.mounter, false) if (err != nil) { return nil, err } - // unmap NBD device - if (refCount == 1) - { - if (!ns.useVduse) - { - unmapNbd(devicePath) - } - else - { - unmapVduse(ns.stateDir, devicePath) - } - } + return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -397,7 +571,17 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV // NodeGetCapabilities returns the supported capabilities of the node server func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { - return &csi.NodeGetCapabilitiesResponse{}, nil + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: []*csi.NodeServiceCapability{ + &csi.NodeServiceCapability{ + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + }, + }, + }, nil } // NodeGetInfo returns NodeGetInfoResponse for CO.