// Copyright (c) Vitaliy Filippov, 2019+ // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) package vitastor import ( "context" "os" "os/exec" "encoding/json" "strings" "bytes" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/utils/mount" utilexec "k8s.io/utils/exec" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "k8s.io/klog" ) // NodeServer struct of Vitastor CSI driver with supported methods of CSI node server spec. type NodeServer struct { *Driver mounter mount.Interface } // NewNodeServer create new instance node func NewNodeServer(driver *Driver) *NodeServer { return &NodeServer{ Driver: driver, mounter: mount.New(""), } } // NodeStageVolume mounts the volume to a staging path on the node. func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { return &csi.NodeStageVolumeResponse{}, nil } // NodeUnstageVolume unstages the volume from the staging path func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { return &csi.NodeUnstageVolumeResponse{}, nil } func Contains(list []string, s string) bool { for i := 0; i < len(list); i++ { if (list[i] == s) { return true } } return false } // NodePublishVolume mounts the volume mounted to the staging path to the target path func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { klog.Infof("received node publish volume request %+v", protosanitizer.StripSecrets(req)) targetPath := req.GetTargetPath() // Check that it's not already mounted free, error := mount.IsNotMountPoint(ns.mounter, targetPath) if (error != nil) { if (os.IsNotExist(error)) { error := os.MkdirAll(targetPath, 0777) if (error != nil) { return nil, status.Error(codes.Internal, error.Error()) } free = true } else { return nil, status.Error(codes.Internal, error.Error()) } } if (!free) { return &csi.NodePublishVolumeResponse{}, nil } ctxVars := make(map[string]string) err := json.Unmarshal([]byte(req.VolumeId), &ctxVars) if (err != nil) { return nil, status.Error(codes.Internal, "volume ID not in JSON format") } volName := ctxVars["name"] _, etcdUrl, etcdPrefix := GetConnectionParams(ctxVars) if (len(etcdUrl) == 0) { return nil, status.Error(codes.InvalidArgument, "no etcdUrl in storage class configuration and no etcd_address in vitastor.conf") } // Map NBD device // FIXME: Check if already mapped args := []string{ "map", "--etcd_address", strings.Join(etcdUrl, ","), "--etcd_prefix", etcdPrefix, "--image", volName, }; if (ctxVars["configPath"] != "") { args = append(args, "--config_path", ctxVars["configPath"]) } if (req.GetReadonly()) { args = append(args, "--readonly", "1") } c := exec.Command("/usr/bin/vitastor-nbd", args...) var stdout, stderr bytes.Buffer c.Stdout, c.Stderr = &stdout, &stderr err = c.Run() stdoutStr, stderrStr := string(stdout.Bytes()), string(stderr.Bytes()) if (err != nil) { klog.Errorf("vitastor-nbd map failed: %s, status %s\n", stdoutStr+stderrStr, err) return nil, status.Error(codes.Internal, stdoutStr+stderrStr+" (status "+err.Error()+")") } devicePath := strings.TrimSpace(stdoutStr) // Check existing format diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()} existingFormat, err := diskMounter.GetDiskFormat(devicePath) if (err != nil) { klog.Errorf("failed to get disk format for path %s, error: %v", err) // unmap NBD device unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput() if (unmapErr != nil) { klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr) } return nil, err } // Format the device (ext4 or xfs) fsType := req.GetVolumeCapability().GetMount().GetFsType() isBlock := req.GetVolumeCapability().GetBlock() != nil opt := req.GetVolumeCapability().GetMount().GetMountFlags() opt = append(opt, "_netdev") if ((req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY || req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY) && !Contains(opt, "ro")) { opt = append(opt, "ro") } if (fsType == "xfs") { opt = append(opt, "nouuid") } readOnly := Contains(opt, "ro") if (existingFormat == "" && !readOnly) { args := []string{} switch fsType { case "ext4": args = []string{"-m0", "-Enodiscard,lazy_itable_init=1,lazy_journal_init=1", devicePath} case "xfs": args = []string{"-K", devicePath} } if (len(args) > 0) { cmdOut, cmdErr := diskMounter.Exec.Command("mkfs."+fsType, args...).CombinedOutput() if (cmdErr != nil) { klog.Errorf("failed to run mkfs error: %v, output: %v", cmdErr, string(cmdOut)) // unmap NBD device unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput() if (unmapErr != nil) { klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr) } return nil, status.Error(codes.Internal, cmdErr.Error()) } } } if (isBlock) { opt = append(opt, "bind") err = diskMounter.Mount(devicePath, targetPath, fsType, opt) } else { err = diskMounter.FormatAndMount(devicePath, targetPath, fsType, opt) } if (err != nil) { klog.Errorf( "failed to mount device path (%s) to path (%s) for volume (%s) error: %s", devicePath, targetPath, volName, err, ) // unmap NBD device unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput() if (unmapErr != nil) { klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr) } return nil, status.Error(codes.Internal, err.Error()) } return &csi.NodePublishVolumeResponse{}, nil } // NodeUnpublishVolume unmounts the volume from the target path func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { klog.Infof("received node unpublish volume request %+v", protosanitizer.StripSecrets(req)) targetPath := req.GetTargetPath() devicePath, refCount, err := mount.GetDeviceNameFromMount(ns.mounter, targetPath) if (err != nil) { if (os.IsNotExist(err)) { return nil, status.Error(codes.NotFound, "Target path not found") } return nil, status.Error(codes.Internal, err.Error()) } if (devicePath == "") { return nil, status.Error(codes.NotFound, "Volume not mounted") } // unmount err = mount.CleanupMountPoint(targetPath, ns.mounter, false) if (err != nil) { return nil, status.Error(codes.Internal, err.Error()) } // unmap NBD device if (refCount == 1) { unmapOut, unmapErr := exec.Command("/usr/bin/vitastor-nbd", "unmap", devicePath).CombinedOutput() if (unmapErr != nil) { klog.Errorf("failed to unmap NBD device %s: %s, error: %v", devicePath, unmapOut, unmapErr) } } return &csi.NodeUnpublishVolumeResponse{}, nil } // NodeGetVolumeStats returns volume capacity statistics available for the volume func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { return nil, status.Error(codes.Unimplemented, "") } // NodeExpandVolume expanding the file system on the node func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { return nil, status.Error(codes.Unimplemented, "") } // NodeGetCapabilities returns the supported capabilities of the node server func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { return &csi.NodeGetCapabilitiesResponse{}, nil } // NodeGetInfo returns NodeGetInfoResponse for CO. func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { klog.Infof("received node get info request %+v", protosanitizer.StripSecrets(req)) return &csi.NodeGetInfoResponse{ NodeId: ns.NodeID, }, nil }