Implement CSI volume staging (one device per volume on each host)
parent
ff2ccb6ab8
commit
ff91288288
|
@ -37,43 +37,13 @@ func NewNodeServer(driver *Driver) *NodeServer
|
|||
}
|
||||
}
|
||||
|
||||
// NodeStageVolume mounts the volume to a staging path on the node.
|
||||
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error)
|
||||
func (ns *NodeServer) checkMountPoint(targetPath string, isBlock bool)
|
||||
{
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// NodeUnstageVolume unstages the volume from the staging path
|
||||
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error)
|
||||
{
|
||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func Contains(list []string, s string) bool
|
||||
{
|
||||
for i := 0; i < len(list); i++
|
||||
{
|
||||
if (list[i] == s)
|
||||
{
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// NodePublishVolume mounts the volume mounted to the staging path to the target path
|
||||
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error)
|
||||
{
|
||||
klog.Infof("received node publish volume request %+v", protosanitizer.StripSecrets(req))
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||
|
||||
// Check that it's not already mounted
|
||||
_, error := mount.IsNotMountPoint(ns.mounter, targetPath)
|
||||
if (error != nil)
|
||||
_, err := mount.IsNotMountPoint(ns.mounter, targetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
if (os.IsNotExist(error))
|
||||
if (os.IsNotExist(err))
|
||||
{
|
||||
if (isBlock)
|
||||
{
|
||||
|
@ -81,13 +51,13 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to create block device mount target %s with error: %v", targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return status.Error(codes.Internal, err.Error("failed to create block device mount target"))
|
||||
}
|
||||
err = pathFile.Close()
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to close %s with error: %v", targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return status.Error(codes.Internal, err.Error("failed to create block device mount target"))
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -96,15 +66,32 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
if (err != nil)
|
||||
{
|
||||
klog.Errorf("failed to create fs mount target %s with error: %v", targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return status.Error(codes.Internal, err.Error("failed to create fs mount target"))
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
return nil, status.Error(codes.Internal, error.Error())
|
||||
return status.Error(codes.Internal, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NodeStageVolume mounts the volume to a staging path on the node.
|
||||
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error)
|
||||
{
|
||||
klog.Infof("received node stage volume request %+v", protosanitizer.StripSecrets(req))
|
||||
|
||||
targetPath := req.GetStagingTargetPath()
|
||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||
|
||||
// Check that it's not already mounted
|
||||
err := ns.checkMountPoint(targetPath, isBlock)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctxVars := make(map[string]string)
|
||||
err := json.Unmarshal([]byte(req.VolumeId), &ctxVars)
|
||||
|
@ -121,7 +108,6 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
}
|
||||
|
||||
// Map NBD device
|
||||
// FIXME: Check if already mapped
|
||||
args := []string{
|
||||
"map", "--etcd_address", strings.Join(etcdUrl, ","),
|
||||
"--etcd_prefix", etcdPrefix,
|
||||
|
@ -226,6 +212,60 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// NodeUnstageVolume unstages the volume from the staging path
|
||||
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error)
|
||||
{
|
||||
klog.Infof("received node unstage volume request %+v", protosanitizer.StripSecrets(req))
|
||||
err := ns.unmount(req.GetStagingTargetPath())
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func Contains(list []string, s string) bool
|
||||
{
|
||||
for i := 0; i < len(list); i++
|
||||
{
|
||||
if (list[i] == s)
|
||||
{
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// NodePublishVolume mounts the volume mounted to the staging path to the target path
|
||||
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error)
|
||||
{
|
||||
klog.Infof("received node publish volume request %+v", protosanitizer.StripSecrets(req))
|
||||
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
targetPath := req.GetTargetPath()
|
||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||
|
||||
err := ns.checkMountPoint(targetPath, isBlock)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
|
||||
diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()}
|
||||
err = diskMounter.Mount(stagingTargetPath, targetPath, "", []string{"bind"})
|
||||
if (err != nil)
|
||||
{
|
||||
klog.Errorf(
|
||||
"failed to bind staging path (%s) to path (%s) for volume (%s) error: %s",
|
||||
stagingPath, targetPath, volName, err,
|
||||
)
|
||||
return nil, status.Error(codes.Internal, err.Error("failed to bind staging path"))
|
||||
}
|
||||
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
|
@ -233,25 +273,34 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error)
|
||||
{
|
||||
klog.Infof("received node unpublish volume request %+v", protosanitizer.StripSecrets(req))
|
||||
targetPath := req.GetTargetPath()
|
||||
err := ns.unmount(req.GetTargetPath())
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, err
|
||||
}
|
||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func (ns *NodeServer) unmount(targetPath string) error
|
||||
{
|
||||
devicePath, refCount, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath)
|
||||
if (err != nil)
|
||||
{
|
||||
if (os.IsNotExist(err))
|
||||
{
|
||||
return nil, status.Error(codes.NotFound, "Target path not found")
|
||||
return status.Error(codes.NotFound, "Target path not found")
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
if (devicePath == "")
|
||||
{
|
||||
return nil, status.Error(codes.NotFound, "Volume not mounted")
|
||||
return status.Error(codes.NotFound, "Volume not mounted")
|
||||
}
|
||||
// unmount
|
||||
err = mount.CleanupMountPoint(targetPath, ns.mounter, false)
|
||||
if (err != nil)
|
||||
{
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
// unmap NBD device
|
||||
if (refCount == 1)
|
||||
|
@ -262,7 +311,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||
klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr)
|
||||
}
|
||||
}
|
||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// NodeGetVolumeStats returns volume capacity statistics available for the volume
|
||||
|
|
Loading…
Reference in New Issue